Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split the reporter into client and server modules #73

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .azure/templates/jobs/build_java.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@ jobs:
- template: '../steps/prerequisites/install_java.yaml'
parameters:
JDK_VERSION: $(jdk_version)
- bash: "mvn ${MVN_ARGS} verify"
displayName: "Build & Test Java"
- bash: "mvn ${MVN_ARGS} verify spotbugs:check"
displayName: "Build & Test Java + Spotbugs"
env:
BUILD_REASON: $(Build.Reason)
BRANCH: $(Build.SourceBranch)
TESTCONTAINERS_RYUK_DISABLED: "TRUE"
TESTCONTAINERS_CHECKS_DISABLE: "TRUE"
MVN_ARGS: "-e -V -B"
- bash: "mvn ${MVN_ARGS} spotbugs:check"
displayName: "Spotbugs"
env:
MVN_ARGS: "-e -V -B"
# We have to TAR the target directory to maintain the permissions of
# the files which would otherwise change when downloading the artifact
- bash: tar -cvpf target.tar ./target
Expand Down
30 changes: 17 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,50 +34,54 @@ The metrics reporter has the following configurations:

## Running

### Kafka Brokers
### Kafka Brokers and Controllers

To use the reporter with Kafka brokers and controllers, add the following to your broker configuration:

To use the reporter with Kafka brokers, add the following to your broker configuration:
```properties
metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
kafka.metrics.reporters=io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter
metric.reporters=io.strimzi.kafka.metrics.prometheus.ServerKafkaMetricsReporter
kafka.metrics.reporters=io.strimzi.kafka.metrics.prometheus.ServerYammerMetricsReporter
auto.include.jmx.reporter=false
```

### Kafka Clients

To use the reporter with Kafka producers, consumers or admin clients, add the following to your client configuration:

```properties
metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
auto.include.jmx.reporter=false
```

### Kafka Connect and Kafka Streams

To use the reporter with Kafka Connect and Kafka Streams, add the following to your Connect runtime or Streams application configuration:

```properties
metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
auto.include.jmx.reporter=false
admin.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
admin.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
admin.auto.include.jmx.reporter=false
producer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
producer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
producer.auto.include.jmx.reporter=false
consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
consumer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
consumer.auto.include.jmx.reporter=false
```

When setting configurations for the Prometheus metrics reporter, they also need to be set with the `admin.`, `producer.` and `consumer.`.
For example, to set the `listener` to `http://:8081`:

```properties
metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
prometheus.metrics.reporter.listener=http://:8081
auto.include.jmx.reporter=false
admin.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
admin.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
admin.prometheus.metrics.reporter.listener=http://:8081
admin.auto.include.jmx.reporter=false
producer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
producer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
producer.prometheus.metrics.reporter.listener=http://:8081
producer.auto.include.jmx.reporter=false
consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
consumer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
consumer.prometheus.metrics.reporter.listener=http://:8081
consumer.auto.include.jmx.reporter=false
```
Expand Down
75 changes: 75 additions & 0 deletions client-metrics-reporter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.strimzi</groupId>
<artifactId>metrics-reporter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>

<artifactId>client-metrics-reporter</artifactId>

<dependencies>
<dependency>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For multi module projects, maven best practice is to express shared dependencies in the parent POM using a dependencyManagement section. You can then remove the version numbers from the dependencies in the child poms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip! I already had the versions defined in the parent POM so I removed the versions from the modules.

<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
</dependency>

<dependency>
<groupId>io.prometheus</groupId>
<artifactId>prometheus-metrics-model</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>prometheus-metrics-instrumentation-jvm</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>prometheus-metrics-exporter-httpserver</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.4.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,73 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;
package io.strimzi.kafka.metrics.prometheus;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import io.strimzi.kafka.metrics.http.HttpServers;
import io.strimzi.kafka.metrics.kafka.KafkaCollector;
import io.strimzi.kafka.metrics.kafka.KafkaMetricWrapper;
import io.strimzi.kafka.metrics.prometheus.common.AbstractReporter;
import io.strimzi.kafka.metrics.prometheus.common.MetricWrapper;
import io.strimzi.kafka.metrics.prometheus.common.PrometheusCollector;
import io.strimzi.kafka.metrics.prometheus.http.HttpServers;
import io.strimzi.kafka.metrics.prometheus.kafka.KafkaCollector;
import io.strimzi.kafka.metrics.prometheus.kafka.KafkaMetricWrapper;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;

/**
* MetricsReporter implementation that expose Kafka metrics in the Prometheus format.
* This can be used by Kafka brokers and clients.
* {@link MetricsReporter} implementation that exposes Kafka client metrics in the Prometheus format.
*/
public class KafkaPrometheusMetricsReporter implements MetricsReporter {

private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class);

private final PrometheusRegistry registry;
private final KafkaCollector kafkaCollector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HttpServers.ServerCounter> httpServer;
public class ClientMetricsReporter extends AbstractReporter implements MetricsReporter {

private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsReporter.class);
static final Set<String> PREFIXES = Set.of(
"kafka.admin.client",
"kafka.consumer",
"kafka.producer",
"kafka.connect",
"kafka.streams"
);

final PrometheusRegistry registry;
final KafkaCollector kafkaCollector;

private ClientMetricsReporterConfig config;
Optional<HttpServers.ServerCounter> httpServer = Optional.empty();
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
private String prefix;
String prefix;

/**
* Constructor
*/
public KafkaPrometheusMetricsReporter() {
public ClientMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
kafkaCollector = KafkaCollector.getCollector(PrometheusCollector.register(registry));
kafkaCollector.addReporter(this);
}

// for testing
KafkaPrometheusMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector) {
ClientMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector) {
this.registry = registry;
this.kafkaCollector = kafkaCollector;
kafkaCollector.addReporter(this);
}

@Override
public void configure(Map<String, ?> map) {
config = new PrometheusMetricsReporterConfig(map, registry);
config = new ClientMetricsReporterConfig(map, registry);
httpServer = config.startHttpServer();
LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
LOG.debug("ClientMetricsReporter configured with {}", config);
}

@Override
Expand All @@ -68,23 +78,21 @@ public void init(List<KafkaMetric> metrics) {
}
}

@Override
public void metricChange(KafkaMetric metric) {
String prometheusName = KafkaMetricWrapper.prometheusName(prefix, metric.metricName());
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
kafkaCollector.addMetric(metric.metricName(), metricWrapper);
}
MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
addMetric(metric, metricWrapper);
}

@Override
public void metricRemoval(KafkaMetric metric) {
kafkaCollector.removeMetric(metric.metricName());
removeMetric(metric);
}

@Override
public void close() {
kafkaCollector.removeReporter(this);
httpServer.ifPresent(HttpServers::release);
}

Expand All @@ -98,17 +106,26 @@ public void validateReconfiguration(Map<String, ?> configs) throws ConfigExcepti

@Override
public Set<String> reconfigurableConfigs() {
return Collections.emptySet();
return Set.of();
}

@Override
public void contextChange(MetricsContext metricsContext) {
String prefix = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
if (!PREFIXES.contains(prefix)) {
throw new IllegalStateException("ClientMetricsReporter should only be used in Kafka clients");
}
this.prefix = PrometheusNaming.prometheusName(prefix);
}

// for testing
Optional<Integer> getPort() {
return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().port() : null);
}

@Override
protected Pattern allowlist() {
return config.allowlist();
}

}
Loading