diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index f2ebcc31ea..ffb1c424b5 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -104,6 +104,8 @@ public interface GobblinTemporalConfigurationKeys { PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options"; String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds"; int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java index 7c796ebe35..54953abe10 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java @@ -49,6 +49,9 @@ public enum ActivityType { /** Activity type for committing step. */ COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + /** Activity type for submitting GTE. */ + SUBMIT_GTE(GobblinTemporalConfigurationKeys.TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + /** Default placeholder activity type. */ DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java index b440658904..5a3350fc7d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -46,7 +46,7 @@ public CommitStats commit(WUProcessingSpec workSpec, final Properties props) { final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ActivityType.COMMIT.buildActivityOptions(props)); CommitStats commitGobblinStats = activityStub.commit(workSpec); if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext()); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(), props); timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())) .submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 661847bdce..d841b00c22 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -83,14 +83,14 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow { @Override public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) { - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); + // Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties + final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps, + com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX)); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, temporalJobProps); timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME` EventTimer jobSuccessTimer = timerFactory.createJobTimer(); Optional optGenerateWorkUnitResult = Optional.empty(); WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext); - // Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties - final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps, - com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX)); boolean isSuccessful = false; try (Closer closer = Closer.create()) { final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 00def8a2ee..e0119cfc5d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -55,7 +55,7 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { @Override public CommitStats process(WUProcessingSpec workSpec, final Properties props) { - Optional timer = this.createOptJobEventTimer(workSpec); + Optional timer = this.createOptJobEventTimer(workSpec, props); CommitStats result = performWork(workSpec, props); timer.ifPresent(EventTimer::stop); return result; @@ -111,10 +111,10 @@ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSp return result; } - private Optional createOptJobEventTimer(WUProcessingSpec workSpec) { + private Optional createOptJobEventTimer(WUProcessingSpec workSpec, Properties props) { if (workSpec.isToDoJobLevelTiming()) { EventSubmitterContext eventSubmitterContext = workSpec.getEventSubmitterContext(); - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, props); return Optional.of(timerFactory.createJobTimer()); } else { return Optional.empty(); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java index c576a49550..ea930b0573 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java @@ -18,6 +18,7 @@ package org.apache.gobblin.temporal.workflows.helloworld; import java.time.Duration; +import java.util.Properties; import org.slf4j.Logger; @@ -57,7 +58,7 @@ public String getGreeting(String name, EventSubmitterContext eventSubmitterConte /** * Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation. */ - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, new Properties()); try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) { LOG.info("Executing getGreeting"); timer.withMetadata("name", name); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java index 4a6aa8d0bb..6686b5cef9 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java @@ -19,19 +19,20 @@ import java.time.Duration; import java.time.Instant; +import java.util.Properties; import java.util.function.Supplier; import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; -import io.temporal.activity.ActivityOptions; import io.temporal.workflow.Workflow; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.util.GsonUtils; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; /** @@ -153,16 +154,9 @@ public TemporalEventTimer createWorkPreparationTimer() { * addition, it uses the `Workflow`-safe {@link Workflow#currentTimeMillis()}. */ public static class WithinWorkflowFactory extends Factory { - private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofHours(6)) // maximum timeout for the actual event submission to kafka, waiting out a kafka outage - .build(); - public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext) { - this(eventSubmitterContext, DEFAULT_OPTS); - } - - public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, ActivityOptions opts) { - super(eventSubmitterContext, Workflow.newActivityStub(SubmitGTEActivity.class, opts), WithinWorkflowFactory::getCurrentInstant); + public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, Properties props) { + super(eventSubmitterContext, Workflow.newActivityStub(SubmitGTEActivity.class, ActivityType.SUBMIT_GTE.buildActivityOptions(props)), WithinWorkflowFactory::getCurrentInstant); } /** diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java index cdab706d86..d94bda0ca5 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java @@ -69,10 +69,16 @@ public Object[][] activityTypesWithStartToCloseTimeout() { {ActivityType.DELETE_WORK_DIRS, 222}, {ActivityType.PROCESS_WORKUNIT, 555}, {ActivityType.COMMIT, 444}, + {ActivityType.SUBMIT_GTE, 999}, {ActivityType.DEFAULT_ACTIVITY, 1} }; } + @Test + public void testActivityTypesWithStartToCloseTimeoutDataProviderHasAllActivityTypes() { + Assert.assertEquals(activityTypesWithStartToCloseTimeout().length, activityTypes.size()); + } + @Test(dataProvider = "activityTypesWithStartToCloseTimeout") public void testStartToCloseTimeout(ActivityType activityType, int expectedTimeout) { props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), Integer.toString(expectedTimeout));