Skip to content

Commit adef734

Browse files
Add metrics support for Temporal Workflow (#4095)
1 parent 09a3e21 commit adef734

File tree

7 files changed

+237
-1
lines changed

7 files changed

+237
-1
lines changed

gobblin-temporal/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ dependencies {
6464
}
6565
compile externalDependency.tdigest
6666
compile externalDependency."temporal-sdk"
67+
compile externalDependency.micrometerCore
68+
compile externalDependency.micrometerRegistry
6769
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
6870
testCompile project(":gobblin-example")
6971

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java

+10
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,14 @@ public interface GobblinTemporalConfigurationKeys {
7474

7575
String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + "polling.interval.seconds";
7676
int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
77+
78+
/**
79+
* Temporal metrics config properties
80+
*/
81+
String TEMPORAL_METRICS_PREFIX = PREFIX + "metrics.";
82+
String TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT = TEMPORAL_METRICS_PREFIX + "otlp";
83+
String TEMPORAL_METRICS_OTLP_HEADERS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers";
84+
String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
85+
int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
86+
String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
7787
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java

+14
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import com.typesafe.config.Config;
3232
import com.typesafe.config.ConfigFactory;
33+
import com.uber.m3.tally.RootScopeBuilder;
34+
import com.uber.m3.tally.Scope;
3335

3436
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
3537
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
@@ -41,7 +43,10 @@
4143
import javax.net.ssl.TrustManagerFactory;
4244

4345
import org.apache.gobblin.cluster.GobblinClusterUtils;
46+
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
4447
import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
48+
import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
49+
import org.apache.gobblin.util.ConfigUtils;
4550

4651

4752
public class TemporalWorkflowClientFactory {
@@ -100,10 +105,19 @@ public static WorkflowServiceStubs createServiceInstance(String connectionUri) t
100105
.ciphers(SSL_CONFIG_DEFAULT_CIPHER_SUITES)
101106
.build();
102107

108+
// Initialize metrics
109+
int reportInterval = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
110+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
111+
Scope metricsScope = new RootScopeBuilder()
112+
.reporter(TemporalMetricsHelper.getStatsReporter(config))
113+
.tags(TemporalMetricsHelper.getDimensions(config))
114+
.reportEvery(com.uber.m3.util.Duration.ofSeconds(reportInterval));
115+
103116
WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
104117
.setTarget(connectionUri)
105118
.setEnableHttps(true)
106119
.setSslContext(sslContext)
120+
.setMetricsScope(metricsScope)
107121
.build();
108122
return WorkflowServiceStubs.newServiceStubs(options);
109123
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.temporal.workflows.metrics;
19+
20+
import java.time.Duration;
21+
import java.util.Arrays;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.stream.Collectors;
25+
26+
import org.apache.commons.lang.StringUtils;
27+
import org.jetbrains.annotations.NotNull;
28+
29+
import com.fasterxml.jackson.databind.ObjectMapper;
30+
import com.google.common.annotations.VisibleForTesting;
31+
import com.typesafe.config.Config;
32+
import com.uber.m3.tally.StatsReporter;
33+
34+
import io.micrometer.core.instrument.Clock;
35+
import io.micrometer.core.instrument.MeterRegistry;
36+
import io.micrometer.registry.otlp.OtlpConfig;
37+
import io.micrometer.registry.otlp.OtlpMeterRegistry;
38+
import io.temporal.common.reporter.MicrometerClientStatsReporter;
39+
import lombok.extern.slf4j.Slf4j;
40+
41+
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
42+
import org.apache.gobblin.util.ConfigUtils;
43+
44+
45+
@Slf4j
46+
public class TemporalMetricsHelper {
47+
48+
/**
49+
* Retrieves a map of dimension names and their corresponding values from the provided config.
50+
* The dimensions are defined as a comma-separated string in the config, and the method
51+
* fetches the corresponding values for each dimension.
52+
* A missing dimension in config will have empty string as value.
53+
*
54+
* @param config Config object
55+
* @return a map where the key is the dimension name and the value is the corresponding value from the config
56+
*/
57+
public static Map<String, String> getDimensions(Config config) {
58+
String dimensionsString = ConfigUtils.getString(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY, "");
59+
60+
// Split the string by "," and create a map by fetching values from config
61+
return Arrays.stream(dimensionsString.split(","))
62+
.map(String::trim)
63+
.filter(StringUtils::isNotBlank)
64+
.collect(Collectors.toMap(key -> key, key -> ConfigUtils.getString(config, key, ""), (l, r)-> r));
65+
}
66+
67+
/**
68+
* Creates and returns a {@link StatsReporter} instance configured with an {@link OtlpMeterRegistry}.
69+
* This reporter can be used to report metrics via the OpenTelemetry Protocol (OTLP) to a metrics backend.
70+
*
71+
* @param config Config object
72+
* @return a {@link StatsReporter} instance, configured with an OTLP meter registry and ready to report metrics.
73+
*/
74+
public static StatsReporter getStatsReporter(Config config) {
75+
OtlpConfig otlpConfig = getOtlpConfig(config);
76+
MeterRegistry meterRegistry = new OtlpMeterRegistry(otlpConfig, Clock.SYSTEM);
77+
return new MicrometerClientStatsReporter(meterRegistry);
78+
}
79+
80+
@VisibleForTesting
81+
static OtlpConfig getOtlpConfig(Config config) {
82+
return new OtlpConfig() {
83+
@Override
84+
public String get(@NotNull String key) {
85+
return ConfigUtils.getString(config, key, null);
86+
}
87+
88+
@NotNull
89+
@Override
90+
public String prefix() {
91+
return GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT;
92+
}
93+
94+
@NotNull
95+
@Override
96+
public Map<String, String> headers() {
97+
String headers = get(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_HEADERS_KEY);
98+
return parseHeaders(headers);
99+
}
100+
101+
@NotNull
102+
@Override
103+
public Duration step() {
104+
int reportInterval = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
105+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
106+
return Duration.ofSeconds(reportInterval);
107+
}
108+
};
109+
}
110+
111+
private static Map<String, String> parseHeaders(String headersString) {
112+
try {
113+
ObjectMapper mapper = new ObjectMapper();
114+
return mapper.readValue(headersString, HashMap.class);
115+
} catch (Exception e) {
116+
String errMsg = "Failed to parse headers: " + headersString;
117+
log.error(errMsg, e);
118+
throw new RuntimeException(errMsg);
119+
}
120+
}
121+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gobblin.temporal.workflows.metrics;
18+
19+
20+
import io.micrometer.registry.otlp.OtlpConfig;
21+
22+
import org.testng.Assert;
23+
import org.testng.annotations.BeforeClass;
24+
import org.testng.annotations.Test;
25+
26+
import java.util.Map;
27+
28+
import com.typesafe.config.Config;
29+
import com.typesafe.config.ConfigFactory;
30+
import com.typesafe.config.ConfigValueFactory;
31+
32+
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
33+
34+
35+
/** Test {@link TemporalMetricsHelper} */
36+
public class TemporalMetricsHelperTest {
37+
38+
private Config config;
39+
40+
@BeforeClass
41+
public void setup() {
42+
config = ConfigFactory.empty()
43+
.withValue("prefix", ConfigValueFactory.fromAnyRef("gobblin.temporal.metrics.otlp"))
44+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers",
45+
ConfigValueFactory.fromAnyRef("{\"abc\":\"123\", \"pqr\":\"456\"}"))
46+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".resourceAttributes",
47+
ConfigValueFactory.fromAnyRef("service.name=gobblin-service"))
48+
.withValue("dim1", ConfigValueFactory.fromAnyRef("val1"))
49+
.withValue("dim2", ConfigValueFactory.fromAnyRef("val2"))
50+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions",
51+
ConfigValueFactory.fromAnyRef("dim1,dim2,missingDimension"));
52+
}
53+
54+
@Test
55+
public void testGetDimensions() {
56+
Map<String, String> dimensions = TemporalMetricsHelper.getDimensions(config);
57+
58+
Assert.assertNotNull(dimensions);
59+
Assert.assertEquals(3, dimensions.size());
60+
Assert.assertEquals("val1", dimensions.get("dim1"));
61+
Assert.assertEquals("val2", dimensions.get("dim2"));
62+
Assert.assertEquals("", dimensions.get("missingDimension"));
63+
}
64+
65+
@Test
66+
public void testGetDimensionsEmptyConfig() {
67+
Map<String, String> dimensions = TemporalMetricsHelper.getDimensions(ConfigFactory.empty());
68+
69+
Assert.assertNotNull(dimensions);
70+
Assert.assertEquals(0, dimensions.size());
71+
}
72+
73+
@Test
74+
public void testGetOtlpConfig() {
75+
OtlpConfig otlpConfig = TemporalMetricsHelper.getOtlpConfig(config);
76+
77+
Map<String, String> headers = otlpConfig.headers();
78+
Assert.assertNotNull(headers);
79+
Assert.assertEquals(2, headers.size());
80+
Assert.assertEquals("123", headers.get("abc"));
81+
Assert.assertEquals("456", headers.get("pqr"));
82+
83+
Assert.assertEquals("gobblin-service", otlpConfig.resourceAttributes().get("service.name"));
84+
}
85+
}

gradle/scripts/defaultBuildProperties.gradle

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
4141
.register(new BuildProperty("publishToMaven", false, "Enable publishing of artifacts to a central Maven repository"))
4242
.register(new BuildProperty("publishToNexus", false, "Enable publishing of artifacts to Nexus"))
4343
.register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce dependencies version"))
44-
.register(new BuildProperty("openTelemetryVersion", "1.29.0", "OpenTelemetry dependencies version"))
44+
.register(new BuildProperty("openTelemetryVersion", "1.30.0", "OpenTelemetry dependencies version"))
45+
.register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer dependencies version"))
4546
task buildProperties(description: 'Lists main properties that can be used to customize the build') {
4647
doLast {
4748
BUILD_PROPERTIES.printHelp();
@@ -74,5 +75,6 @@ BUILD_PROPERTIES.ensureDefined('kafka1Version')
7475
BUILD_PROPERTIES.ensureDefined('pegasusVersion')
7576
BUILD_PROPERTIES.ensureDefined('salesforceVersion')
7677
BUILD_PROPERTIES.ensureDefined('openTelemetryVersion')
78+
BUILD_PROPERTIES.ensureDefined('micrometerVersion')
7779

7880
ext.buildProperties = BUILD_PROPERTIES

gradle/scripts/dependencyDefinitions.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ ext.externalDependency = [
120120
"opentelemetrySdk": "io.opentelemetry:opentelemetry-sdk:" + openTelemetryVersion,
121121
"opentelemetryExporterOtlp": "io.opentelemetry:opentelemetry-exporter-otlp:" + openTelemetryVersion,
122122
"opentelemetrySdkTesting": "io.opentelemetry:opentelemetry-sdk-testing:" + openTelemetryVersion,
123+
"micrometerCore": "io.micrometer:micrometer-core:" + micrometerVersion,
124+
"micrometerRegistry": "io.micrometer:micrometer-registry-otlp:" + micrometerVersion,
123125
"jsch": "com.jcraft:jsch:0.1.54",
124126
"jdo2": "javax.jdo:jdo2-api:2.1",
125127
"azkaban": "com.linkedin.azkaban:azkaban:2.5.0",

0 commit comments

Comments
 (0)