2
2
* Copyright Strimzi authors.
3
3
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4
4
*/
5
- package io .strimzi .kafka .metrics ;
5
+ package io .strimzi .kafka .metrics . prometheus ;
6
6
7
7
import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
8
8
import io .prometheus .metrics .model .registry .PrometheusRegistry ;
9
9
import io .prometheus .metrics .model .snapshots .PrometheusNaming ;
10
- import io .strimzi .kafka .metrics .http .HttpServers ;
11
- import io .strimzi .kafka .metrics .kafka .KafkaCollector ;
12
- import io .strimzi .kafka .metrics .kafka .KafkaMetricWrapper ;
10
+ import io .strimzi .kafka .metrics .prometheus .common .AbstractReporter ;
11
+ import io .strimzi .kafka .metrics .prometheus .common .MetricWrapper ;
12
+ import io .strimzi .kafka .metrics .prometheus .common .PrometheusCollector ;
13
+ import io .strimzi .kafka .metrics .prometheus .http .HttpServers ;
14
+ import io .strimzi .kafka .metrics .prometheus .kafka .KafkaCollector ;
15
+ import io .strimzi .kafka .metrics .prometheus .kafka .KafkaMetricWrapper ;
13
16
import org .apache .kafka .common .config .ConfigException ;
14
17
import org .apache .kafka .common .metrics .KafkaMetric ;
15
18
import org .apache .kafka .common .metrics .MetricsContext ;
16
19
import org .apache .kafka .common .metrics .MetricsReporter ;
17
20
import org .slf4j .Logger ;
18
21
import org .slf4j .LoggerFactory ;
19
22
20
- import java .util .Collections ;
21
23
import java .util .List ;
22
24
import java .util .Map ;
23
25
import java .util .Optional ;
24
26
import java .util .Set ;
27
+ import java .util .regex .Pattern ;
25
28
26
29
/**
27
- * MetricsReporter implementation that expose Kafka metrics in the Prometheus format.
28
- * This can be used by Kafka brokers and clients.
30
+ * {@link MetricsReporter} implementation that exposes Kafka client metrics in the Prometheus format.
29
31
*/
30
- public class KafkaPrometheusMetricsReporter implements MetricsReporter {
31
-
32
- private static final Logger LOG = LoggerFactory .getLogger (KafkaPrometheusMetricsReporter .class );
33
-
34
- private final PrometheusRegistry registry ;
35
- private final KafkaCollector kafkaCollector ;
36
- @ SuppressFBWarnings ({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR" }) // This field is initialized in the configure method
37
- private PrometheusMetricsReporterConfig config ;
38
- @ SuppressFBWarnings ({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR" }) // This field is initialized in the configure method
39
- private Optional <HttpServers .ServerCounter > httpServer ;
32
+ public class ClientMetricsReporter extends AbstractReporter implements MetricsReporter {
33
+
34
+ private static final Logger LOG = LoggerFactory .getLogger (ClientMetricsReporter .class );
35
+ static final Set <String > PREFIXES = Set .of (
36
+ "kafka.admin.client" ,
37
+ "kafka.consumer" ,
38
+ "kafka.producer" ,
39
+ "kafka.connect" ,
40
+ "kafka.streams"
41
+ );
42
+
43
+ final PrometheusRegistry registry ;
44
+ final KafkaCollector kafkaCollector ;
45
+
46
+ ClientMetricsReporterConfig config ;
47
+ Optional <HttpServers .ServerCounter > httpServer = Optional .empty ();
40
48
@ SuppressFBWarnings ({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR" }) // This field is initialized in the contextChange method
41
- private String prefix ;
49
+ String prefix ;
42
50
43
51
/**
44
52
* Constructor
45
53
*/
46
- public KafkaPrometheusMetricsReporter () {
54
+ public ClientMetricsReporter () {
47
55
registry = PrometheusRegistry .defaultRegistry ;
48
56
kafkaCollector = KafkaCollector .getCollector (PrometheusCollector .register (registry ));
57
+ kafkaCollector .addReporter (this );
49
58
}
50
59
51
60
// for testing
52
- KafkaPrometheusMetricsReporter (PrometheusRegistry registry , KafkaCollector kafkaCollector ) {
61
+ ClientMetricsReporter (PrometheusRegistry registry , KafkaCollector kafkaCollector ) {
53
62
this .registry = registry ;
54
63
this .kafkaCollector = kafkaCollector ;
64
+ kafkaCollector .addReporter (this );
55
65
}
56
66
57
67
@ Override
58
68
public void configure (Map <String , ?> map ) {
59
- config = new PrometheusMetricsReporterConfig (map , registry );
69
+ config = new ClientMetricsReporterConfig (map , registry );
60
70
httpServer = config .startHttpServer ();
61
- LOG .debug ("KafkaPrometheusMetricsReporter configured with {}" , config );
71
+ LOG .debug ("ClientMetricsReporter configured with {}" , config );
62
72
}
63
73
64
74
@ Override
@@ -70,21 +80,18 @@ public void init(List<KafkaMetric> metrics) {
70
80
71
81
public void metricChange (KafkaMetric metric ) {
72
82
String prometheusName = KafkaMetricWrapper .prometheusName (prefix , metric .metricName ());
73
- if (!config .isAllowed (prometheusName )) {
74
- LOG .trace ("Ignoring metric {} as it does not match the allowlist" , prometheusName );
75
- } else {
76
- MetricWrapper metricWrapper = new KafkaMetricWrapper (prometheusName , metric , metric .metricName ().name ());
77
- kafkaCollector .addMetric (metric .metricName (), metricWrapper );
78
- }
83
+ MetricWrapper metricWrapper = new KafkaMetricWrapper (prometheusName , metric , metric .metricName ().name ());
84
+ addMetric (metric , metricWrapper );
79
85
}
80
86
81
87
@ Override
82
88
public void metricRemoval (KafkaMetric metric ) {
83
- kafkaCollector . removeMetric (metric . metricName () );
89
+ removeMetric (metric );
84
90
}
85
91
86
92
@ Override
87
93
public void close () {
94
+ kafkaCollector .removeReporter (this );
88
95
httpServer .ifPresent (HttpServers ::release );
89
96
}
90
97
@@ -98,17 +105,26 @@ public void validateReconfiguration(Map<String, ?> configs) throws ConfigExcepti
98
105
99
106
@ Override
100
107
public Set <String > reconfigurableConfigs () {
101
- return Collections . emptySet ();
108
+ return Set . of ();
102
109
}
103
110
104
111
@ Override
105
112
public void contextChange (MetricsContext metricsContext ) {
106
113
String prefix = metricsContext .contextLabels ().get (MetricsContext .NAMESPACE );
114
+ if (!PREFIXES .contains (prefix )) {
115
+ throw new IllegalStateException ("ClientMetricsReporter should only be used in Kafka servers" );
116
+ }
107
117
this .prefix = PrometheusNaming .prometheusName (prefix );
108
118
}
109
119
110
120
// for testing
111
121
Optional <Integer > getPort () {
112
122
return Optional .ofNullable (httpServer .isPresent () ? httpServer .get ().port () : null );
113
123
}
124
+
125
+ @ Override
126
+ protected Pattern allowlist () {
127
+ return config .allowlist ();
128
+ }
129
+
114
130
}
0 commit comments