diff --git a/.azure/templates/jobs/build_java.yaml b/.azure/templates/jobs/build_java.yaml
index c418337..acb05e1 100644
--- a/.azure/templates/jobs/build_java.yaml
+++ b/.azure/templates/jobs/build_java.yaml
@@ -28,18 +28,14 @@ jobs:
- template: '../steps/prerequisites/install_java.yaml'
parameters:
JDK_VERSION: $(jdk_version)
- - bash: "mvn ${MVN_ARGS} verify"
- displayName: "Build & Test Java"
+ - bash: "mvn ${MVN_ARGS} verify spotbugs:check"
+ displayName: "Build & Test Java + Spotbugs"
env:
BUILD_REASON: $(Build.Reason)
BRANCH: $(Build.SourceBranch)
TESTCONTAINERS_RYUK_DISABLED: "TRUE"
TESTCONTAINERS_CHECKS_DISABLE: "TRUE"
MVN_ARGS: "-e -V -B"
- - bash: "mvn ${MVN_ARGS} spotbugs:check"
- displayName: "Spotbugs"
- env:
- MVN_ARGS: "-e -V -B"
# We have to TAR the target directory to maintain the permissions of
# the files which would otherwise change when downloading the artifact
- bash: tar -cvpf target.tar ./target
diff --git a/README.md b/README.md
index 0836744..c9832ed 100644
--- a/README.md
+++ b/README.md
@@ -34,50 +34,54 @@ The metrics reporter has the following configurations:
## Running
-### Kafka Brokers
+### Kafka Brokers and Controllers
+
+To use the reporter with Kafka brokers and controllers, add the following to your broker configuration:
-To use the reporter with Kafka brokers, add the following to your broker configuration:
```properties
-metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
-kafka.metrics.reporters=io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter
+metric.reporters=io.strimzi.kafka.metrics.prometheus.ServerKafkaMetricsReporter
+kafka.metrics.reporters=io.strimzi.kafka.metrics.prometheus.ServerYammerMetricsReporter
auto.include.jmx.reporter=false
```
### Kafka Clients
To use the reporter with Kafka producers, consumers or admin clients, add the following to your client configuration:
+
```properties
-metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
auto.include.jmx.reporter=false
```
### Kafka Connect and Kafka Streams
To use the reporter with Kafka Connect and Kafka Streams, add the following to your Connect runtime or Streams application configuration:
+
```properties
-metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
auto.include.jmx.reporter=false
-admin.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+admin.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
admin.auto.include.jmx.reporter=false
-producer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+producer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
producer.auto.include.jmx.reporter=false
-consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+consumer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
consumer.auto.include.jmx.reporter=false
```
When setting configurations for the Prometheus metrics reporter, they also need to be set with the `admin.`, `producer.` and `consumer.`.
For example, to set the `listener` to `http://:8081`:
+
```properties
-metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
prometheus.metrics.reporter.listener=http://:8081
auto.include.jmx.reporter=false
-admin.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+admin.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
admin.prometheus.metrics.reporter.listener=http://:8081
admin.auto.include.jmx.reporter=false
-producer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+producer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
producer.prometheus.metrics.reporter.listener=http://:8081
producer.auto.include.jmx.reporter=false
-consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter
+consumer.metric.reporters=io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter
consumer.prometheus.metrics.reporter.listener=http://:8081
consumer.auto.include.jmx.reporter=false
```
diff --git a/client-metrics-reporter/pom.xml b/client-metrics-reporter/pom.xml
new file mode 100644
index 0000000..8314ea4
--- /dev/null
+++ b/client-metrics-reporter/pom.xml
@@ -0,0 +1,75 @@
+
+
+ 4.0.0
+
+ io.strimzi
+ metrics-reporter
+ 1.0.0-SNAPSHOT
+
+
+ client-metrics-reporter
+
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+ org.slf4j
+ slf4j-api
+
+
+ com.github.spotbugs
+ spotbugs-annotations
+
+
+
+ io.prometheus
+ prometheus-metrics-model
+
+
+ io.prometheus
+ prometheus-metrics-instrumentation-jvm
+
+
+ io.prometheus
+ prometheus-metrics-exporter-httpserver
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+
+
+ org.slf4j
+ slf4j-simple
+
+
+ io.strimzi
+ strimzi-test-container
+
+
+ org.testcontainers
+ testcontainers
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.4.2
+
+
+
+ test-jar
+
+
+
+
+
+
+
diff --git a/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java b/client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/ClientMetricsReporter.java
similarity index 53%
rename from src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java
rename to client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/ClientMetricsReporter.java
index 7c39995..08af69e 100644
--- a/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java
+++ b/client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/ClientMetricsReporter.java
@@ -2,14 +2,17 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
-package io.strimzi.kafka.metrics;
+package io.strimzi.kafka.metrics.prometheus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
-import io.strimzi.kafka.metrics.http.HttpServers;
-import io.strimzi.kafka.metrics.kafka.KafkaCollector;
-import io.strimzi.kafka.metrics.kafka.KafkaMetricWrapper;
+import io.strimzi.kafka.metrics.prometheus.common.AbstractReporter;
+import io.strimzi.kafka.metrics.prometheus.common.MetricWrapper;
+import io.strimzi.kafka.metrics.prometheus.common.PrometheusCollector;
+import io.strimzi.kafka.metrics.prometheus.http.HttpServers;
+import io.strimzi.kafka.metrics.prometheus.kafka.KafkaCollector;
+import io.strimzi.kafka.metrics.prometheus.kafka.KafkaMetricWrapper;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
@@ -17,48 +20,55 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.regex.Pattern;
/**
- * MetricsReporter implementation that expose Kafka metrics in the Prometheus format.
- * This can be used by Kafka brokers and clients.
+ * {@link MetricsReporter} implementation that exposes Kafka client metrics in the Prometheus format.
*/
-public class KafkaPrometheusMetricsReporter implements MetricsReporter {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class);
-
- private final PrometheusRegistry registry;
- private final KafkaCollector kafkaCollector;
- @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
- private PrometheusMetricsReporterConfig config;
- @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
- private Optional httpServer;
+public class ClientMetricsReporter extends AbstractReporter implements MetricsReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsReporter.class);
+ static final Set PREFIXES = Set.of(
+ "kafka.admin.client",
+ "kafka.consumer",
+ "kafka.producer",
+ "kafka.connect",
+ "kafka.streams"
+ );
+
+ final PrometheusRegistry registry;
+ final KafkaCollector kafkaCollector;
+
+ private ClientMetricsReporterConfig config;
+ Optional httpServer = Optional.empty();
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
- private String prefix;
+ String prefix;
/**
* Constructor
*/
- public KafkaPrometheusMetricsReporter() {
+ public ClientMetricsReporter() {
registry = PrometheusRegistry.defaultRegistry;
kafkaCollector = KafkaCollector.getCollector(PrometheusCollector.register(registry));
+ kafkaCollector.addReporter(this);
}
// for testing
- KafkaPrometheusMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector) {
+ ClientMetricsReporter(PrometheusRegistry registry, KafkaCollector kafkaCollector) {
this.registry = registry;
this.kafkaCollector = kafkaCollector;
+ kafkaCollector.addReporter(this);
}
@Override
public void configure(Map map) {
- config = new PrometheusMetricsReporterConfig(map, registry);
+ config = new ClientMetricsReporterConfig(map, registry);
httpServer = config.startHttpServer();
- LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
+ LOG.debug("ClientMetricsReporter configured with {}", config);
}
@Override
@@ -68,23 +78,21 @@ public void init(List metrics) {
}
}
+ @Override
public void metricChange(KafkaMetric metric) {
String prometheusName = KafkaMetricWrapper.prometheusName(prefix, metric.metricName());
- if (!config.isAllowed(prometheusName)) {
- LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
- } else {
- MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
- kafkaCollector.addMetric(metric.metricName(), metricWrapper);
- }
+ MetricWrapper metricWrapper = new KafkaMetricWrapper(prometheusName, metric, metric.metricName().name());
+ addMetric(metric, metricWrapper);
}
@Override
public void metricRemoval(KafkaMetric metric) {
- kafkaCollector.removeMetric(metric.metricName());
+ removeMetric(metric);
}
@Override
public void close() {
+ kafkaCollector.removeReporter(this);
httpServer.ifPresent(HttpServers::release);
}
@@ -98,12 +106,15 @@ public void validateReconfiguration(Map configs) throws ConfigExcepti
@Override
public Set reconfigurableConfigs() {
- return Collections.emptySet();
+ return Set.of();
}
@Override
public void contextChange(MetricsContext metricsContext) {
String prefix = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
+ if (!PREFIXES.contains(prefix)) {
+ throw new IllegalStateException("ClientMetricsReporter should only be used in Kafka clients");
+ }
this.prefix = PrometheusNaming.prometheusName(prefix);
}
@@ -111,4 +122,10 @@ public void contextChange(MetricsContext metricsContext) {
Optional getPort() {
return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().port() : null);
}
+
+ @Override
+ protected Pattern allowlist() {
+ return config.allowlist();
+ }
+
}
diff --git a/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java b/client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/ClientMetricsReporterConfig.java
similarity index 84%
rename from src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java
rename to client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/ClientMetricsReporterConfig.java
index ac2c2a0..4aeb629 100644
--- a/src/main/java/io/strimzi/kafka/metrics/PrometheusMetricsReporterConfig.java
+++ b/client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/ClientMetricsReporterConfig.java
@@ -2,12 +2,12 @@
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
-package io.strimzi.kafka.metrics;
+package io.strimzi.kafka.metrics.prometheus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
-import io.strimzi.kafka.metrics.http.HttpServers;
-import io.strimzi.kafka.metrics.http.Listener;
+import io.strimzi.kafka.metrics.prometheus.http.HttpServers;
+import io.strimzi.kafka.metrics.prometheus.http.Listener;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@@ -22,11 +22,11 @@
import java.util.stream.Collectors;
/**
-* Configuration for the PrometheusMetricsReporter implementation.
+* Configuration for {@link ClientMetricsReporter}.
*/
-public class PrometheusMetricsReporterConfig extends AbstractConfig {
+public class ClientMetricsReporterConfig extends AbstractConfig {
- private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsReporterConfig.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsReporterConfig.class);
private static final String CONFIG_PREFIX = "prometheus.metrics.reporter.";
/**
@@ -62,15 +62,15 @@ public class PrometheusMetricsReporterConfig extends AbstractConfig {
public static final String ALLOWLIST_CONFIG_DEFAULT = ".*";
private static final String ALLOWLIST_CONFIG_DOC = "A comma separated list of regex patterns to specify the metrics to collect.";
- private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(LISTENER_CONFIG, ConfigDef.Type.STRING, LISTENER_CONFIG_DEFAULT, new Listener.ListenerValidator(), ConfigDef.Importance.HIGH, LISTENER_CONFIG_DOC)
.define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, ALLOWLIST_CONFIG_DOC)
.define(LISTENER_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, LISTENER_ENABLE_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, LISTENER_ENABLE_CONFIG_DOC);
- private final Listener listener;
- private final boolean listenerEnabled;
- private final Pattern allowlist;
- private final PrometheusRegistry registry;
+ final Listener listener;
+ final boolean listenerEnabled;
+ final PrometheusRegistry registry;
+ final Pattern allowlist;
/**
* Constructor.
@@ -79,7 +79,7 @@ public class PrometheusMetricsReporterConfig extends AbstractConfig {
* @param registry the metrics registry
*/
@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
- public PrometheusMetricsReporterConfig(Map, ?> props, PrometheusRegistry registry) {
+ public ClientMetricsReporterConfig(Map, ?> props, PrometheusRegistry registry) {
super(CONFIG_DEF, props);
this.listener = Listener.parseListener(getString(LISTENER_CONFIG));
this.allowlist = compileAllowlist(getList(ALLOWLIST_CONFIG));
@@ -97,7 +97,15 @@ public boolean isAllowed(String name) {
return allowlist.matcher(name).matches();
}
- private Pattern compileAllowlist(List allowlist) {
+ /**
+ * The configured allowlist.
+ * @return The Pattern for the allowlist
+ */
+ public Pattern allowlist() {
+ return allowlist;
+ }
+
+ Pattern compileAllowlist(List allowlist) {
for (String entry : allowlist) {
try {
Pattern.compile(entry);
@@ -129,7 +137,7 @@ public boolean isListenerEnabled() {
@Override
public String toString() {
- return "PrometheusMetricsReporterConfig{" +
+ return "ClientMetricsReporterConfig{" +
", listener=" + listener +
", listenerEnabled=" + listenerEnabled +
", allowlist=" + allowlist +
diff --git a/client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/common/AbstractReporter.java b/client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/common/AbstractReporter.java
new file mode 100644
index 0000000..b2fb9bb
--- /dev/null
+++ b/client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/common/AbstractReporter.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.metrics.prometheus.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+/**
+ * Common reporter logic to track metrics that match an allowlist pattern. This filters the metrics as they are added
+ * and removed so when Prometheus scrapes the /metrics endpoint, we just have to convert them to the Prometheus format.
+ */
+public abstract class AbstractReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractReporter.class);
+
+ // Metrics that match the allowlist
+ private final Map