Skip to content

Commit

Permalink
[GOBBLIN-2190] Added param to create Activityoptions without Heartbea…
Browse files Browse the repository at this point in the history
…tTimeout (#4103)
  • Loading branch information
Blazer-007 authored Feb 27, 2025
1 parent 1d93982 commit e64f707
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public interface GobblinTemporalConfigurationKeys {
PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_SUBMIT_GTE_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "submit.gte." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options.";
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,24 @@ public enum ActivityType {
this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
}

public ActivityOptions buildActivityOptions(Properties props) {
public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout) {
if (!setHeartbeatTimeout) {
return buildActivityOptionsWithoutHeartBeatTimeout(props);
}
return ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
.setHeartbeatTimeout(getHeartbeatTimeout(props))
.setRetryOptions(buildRetryOptions(props))
.build();
}

private ActivityOptions buildActivityOptionsWithoutHeartBeatTimeout(Properties props) {
return ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
.setRetryOptions(buildRetryOptions(props))
.build();
}

private Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, this.startToCloseTimeoutConfigKey,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {

@Override
public CommitStats commit(WUProcessingSpec workSpec, final Properties props) {
final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ActivityType.COMMIT.buildActivityOptions(props));
final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ActivityType.COMMIT.buildActivityOptions(props, true));
CommitStats commitGobblinStats = activityStub.commit(workSpec);
if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext(), props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event
boolean isSuccessful = false;
try (Closer closer = Closer.create()) {
final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class,
ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps));
ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps, true));
GenerateWorkUnitsResult generateWorkUnitResult = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResult.getWorkUnitsSizeSummary();
Expand All @@ -104,7 +104,7 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event
if (numWUsGenerated > 0) {
TimeBudget timeBudget = calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps);
final RecommendScalingForWorkUnits recommendScalingStub = Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
ActivityType.RECOMMEND_SCALING.buildActivityOptions(temporalJobProps));
ActivityType.RECOMMEND_SCALING.buildActivityOptions(temporalJobProps, false));
List<ScalingDirective> scalingDirectives =
recommendScalingStub.recommendScaling(wuSizeSummary, generateWorkUnitResult.getSourceClass(), timeBudget, jobProps);
log.info("Recommended scaling to process WUs within {}: {}", timeBudget, scalingDirectives);
Expand Down Expand Up @@ -236,7 +236,7 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, EventSubmitterContext ev

final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub(
DeleteWorkDirsActivity.class,
ActivityType.DELETE_WORK_DIRS.buildActivityOptions(jobState.getProperties())
ActivityType.DELETE_WORK_DIRS.buildActivityOptions(jobState.getProperties(), false)
);

DirDeletionResult dirDeletionResult = deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExe
@Override
protected Promise<Integer> launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) {
final ProcessWorkUnit processWorkUnitStub = Workflow.newActivityStub(ProcessWorkUnit.class,
ActivityType.PROCESS_WORKUNIT.buildActivityOptions(props));
ActivityType.PROCESS_WORKUNIT.buildActivityOptions(props, true));
return Async.function(processWorkUnitStub::processWorkUnit, wu);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class NestingExecOfIllustrationItemActivityWorkflowImpl
@Override
protected Promise<String> launchAsyncActivity(final IllustrationItem item, final Properties props) {
final IllustrationItemActivity activityStub =
Workflow.newActivityStub(IllustrationItemActivity.class,ActivityType.DEFAULT_ACTIVITY.buildActivityOptions(props));
Workflow.newActivityStub(IllustrationItemActivity.class,ActivityType.DEFAULT_ACTIVITY.buildActivityOptions(props, false));
return Async.function(activityStub::handleItem, item);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public TemporalEventTimer createWorkPreparationTimer() {
public static class WithinWorkflowFactory extends Factory {

public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, Properties props) {
super(eventSubmitterContext, Workflow.newActivityStub(SubmitGTEActivity.class, ActivityType.SUBMIT_GTE.buildActivityOptions(props)), WithinWorkflowFactory::getCurrentInstant);
super(eventSubmitterContext, Workflow.newActivityStub(SubmitGTEActivity.class, ActivityType.SUBMIT_GTE.buildActivityOptions(props, false)), WithinWorkflowFactory::getCurrentInstant);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setUp() {

@Test
public void testDefaultValuesForTimeouts() {
activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> {
activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props, true)).forEach(activityOptions -> {
Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
Expand Down Expand Up @@ -82,13 +82,13 @@ public void testActivityTypesWithStartToCloseTimeoutDataProviderHasAllActivityTy
@Test(dataProvider = "activityTypesWithStartToCloseTimeout")
public void testStartToCloseTimeout(ActivityType activityType, int expectedTimeout) {
props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), Integer.toString(expectedTimeout));
Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(), Duration.ofMinutes(expectedTimeout));
Assert.assertEquals(activityType.buildActivityOptions(props, false).getStartToCloseTimeout(), Duration.ofMinutes(expectedTimeout));
}

@Test
public void testHeartBeatTimeout() {
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, "14");
activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> {
activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props, true)).forEach(activityOptions -> {
Assert.assertEquals(activityOptions.getHeartbeatTimeout(), Duration.ofMinutes(14));
});
}
Expand All @@ -100,7 +100,7 @@ public void testRetryOptions() {
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, "7.0");
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, "21");

activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> {
activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props, false)).forEach(activityOptions -> {
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), Duration.ofSeconds(115));
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), Duration.ofSeconds(5550));
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 7.0, 0.01);
Expand All @@ -117,7 +117,7 @@ public void testBuildActivityOptions(ActivityType activityType, int expectedTime
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, "7.0");
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, "21");

ActivityOptions activityOptions = activityType.buildActivityOptions(props);
ActivityOptions activityOptions = activityType.buildActivityOptions(props, true);

Assert.assertEquals(activityOptions.getStartToCloseTimeout(), Duration.ofMinutes(expectedTimeout));
Assert.assertEquals(activityOptions.getHeartbeatTimeout(), Duration.ofMinutes(144));
Expand Down

0 comments on commit e64f707

Please sign in to comment.