Skip to content

Commit

Permalink
[GOBBLIN-2190] Added SUBMITGTE activity to ActivityType enum (#4102)
Browse files Browse the repository at this point in the history
  • Loading branch information
Blazer-007 authored Feb 26, 2025
1 parent f2bcdc7 commit 1d93982
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public interface GobblinTemporalConfigurationKeys {
PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public enum ActivityType {
/** Activity type for committing step. */
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),

/** Activity type for submitting GTE. */
SUBMIT_GTE(GobblinTemporalConfigurationKeys.TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),

/** Default placeholder activity type. */
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public CommitStats commit(WUProcessingSpec workSpec, final Properties props) {
final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ActivityType.COMMIT.buildActivityOptions(props));
CommitStats commitGobblinStats = activityStub.commit(workSpec);
if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(), props);
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
.submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {

@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
// Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties
final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps,
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, temporalJobProps);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = Optional.empty();
WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext);
// Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties
final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps,
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
boolean isSuccessful = false;
try (Closer closer = Closer.create()) {
final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow {

@Override
public CommitStats process(WUProcessingSpec workSpec, final Properties props) {
Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec, props);
CommitStats result = performWork(workSpec, props);
timer.ifPresent(EventTimer::stop);
return result;
Expand Down Expand Up @@ -111,10 +111,10 @@ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSp
return result;
}

private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec workSpec) {
private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec workSpec, Properties props) {
if (workSpec.isToDoJobLevelTiming()) {
EventSubmitterContext eventSubmitterContext = workSpec.getEventSubmitterContext();
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, props);
return Optional.of(timerFactory.createJobTimer());
} else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.workflows.helloworld;

import java.time.Duration;
import java.util.Properties;

import org.slf4j.Logger;

Expand Down Expand Up @@ -57,7 +58,7 @@ public String getGreeting(String name, EventSubmitterContext eventSubmitterConte
/**
* Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation.
*/
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext, new Properties());
try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
timer.withMetadata("name", name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.function.Supplier;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;

import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.temporal.ddm.activity.ActivityType;


/**
Expand Down Expand Up @@ -153,16 +154,9 @@ public TemporalEventTimer createWorkPreparationTimer() {
* addition, it uses the `Workflow`-safe {@link Workflow#currentTimeMillis()}.
*/
public static class WithinWorkflowFactory extends Factory {
private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(6)) // maximum timeout for the actual event submission to kafka, waiting out a kafka outage
.build();

public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext) {
this(eventSubmitterContext, DEFAULT_OPTS);
}

public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, ActivityOptions opts) {
super(eventSubmitterContext, Workflow.newActivityStub(SubmitGTEActivity.class, opts), WithinWorkflowFactory::getCurrentInstant);
public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, Properties props) {
super(eventSubmitterContext, Workflow.newActivityStub(SubmitGTEActivity.class, ActivityType.SUBMIT_GTE.buildActivityOptions(props)), WithinWorkflowFactory::getCurrentInstant);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,16 @@ public Object[][] activityTypesWithStartToCloseTimeout() {
{ActivityType.DELETE_WORK_DIRS, 222},
{ActivityType.PROCESS_WORKUNIT, 555},
{ActivityType.COMMIT, 444},
{ActivityType.SUBMIT_GTE, 999},
{ActivityType.DEFAULT_ACTIVITY, 1}
};
}

@Test
public void testActivityTypesWithStartToCloseTimeoutDataProviderHasAllActivityTypes() {
Assert.assertEquals(activityTypesWithStartToCloseTimeout().length, activityTypes.size());
}

@Test(dataProvider = "activityTypesWithStartToCloseTimeout")
public void testStartToCloseTimeout(ActivityType activityType, int expectedTimeout) {
props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), Integer.toString(expectedTimeout));
Expand Down

0 comments on commit 1d93982

Please sign in to comment.