diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index 82ea6460902..f2ebcc31ea7 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys { String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds"; int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10; String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions"; + + /** + * Activities timeout configs + */ + String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + "activity.heartbeat.timeout.minutes"; + int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5; + String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + "activity.heartbeat.interval.minutes"; + int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1; + String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = "activity.starttoclose.timeout.minutes"; + int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360; + String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = + PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options"; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds"; + int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds"; + int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient"; + double DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2; + String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts"; + int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4; + } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java new file mode 100644 index 00000000000..7c796ebe35a --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Properties; + +import lombok.Getter; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * Enum representing different types of activities in the Temporal workflow. + * Each activity type corresponds to a specific operation that can be performed. + */ +public enum ActivityType { + /** Activity type for generating work units. */ + GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for recommending scaling operations. */ + RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for deleting work directories. */ + DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for processing a work unit. */ + PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Activity type for committing step. */ + COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES), + + /** Default placeholder activity type. */ + DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES); + + @Getter private final String startToCloseTimeoutConfigKey; + + ActivityType(String startToCloseTimeoutConfigKey) { + this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey; + } + + public ActivityOptions buildActivityOptions(Properties props) { + return ActivityOptions.newBuilder() + .setStartToCloseTimeout(getStartToCloseTimeout(props)) + .setHeartbeatTimeout(getHeartbeatTimeout(props)) + .setRetryOptions(buildRetryOptions(props)) + .build(); + } + + private Duration getStartToCloseTimeout(Properties props) { + return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, this.startToCloseTimeoutConfigKey, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES)); + } + + private Duration getHeartbeatTimeout(Properties props) { + return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES)); + } + + private RetryOptions buildRetryOptions(Properties props) { + int maximumIntervalSeconds = PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS); + + int initialIntervalSeconds = Math.min(PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS), + maximumIntervalSeconds); + + return RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(initialIntervalSeconds)) + .setMaximumInterval(Duration.ofSeconds(maximumIntervalSeconds)) + .setBackoffCoefficient(PropertiesUtils.getPropAsDouble(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT)) + .setMaximumAttempts(PropertiesUtils.getPropAsInt(props, + GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS)) + .build(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index b43c079c101..b106cdc1532 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -27,6 +27,9 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -41,6 +44,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.io.Closer; + +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityExecutionContext; import io.temporal.failure.ApplicationFailure; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; @@ -85,12 +91,21 @@ public class CommitActivityImpl implements CommitActivity { @Override public CommitStats commit(WUProcessingSpec workSpec) { + ActivityExecutionContext activityExecutionContext = Activity.getExecutionContext(); + ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor"))); // TODO: Make this configurable int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS; Optional optJobName = Optional.empty(); AutomaticTroubleshooter troubleshooter = null; try (FileSystem fs = Help.loadFileSystem(workSpec)) { JobState jobState = Help.loadJobState(workSpec, fs); + + int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); + heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running Commit Activity"), + heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES); + optJobName = Optional.ofNullable(jobState.getJobName()); SharedResourcesBroker instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState); troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties()); @@ -147,6 +162,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { String errCorrelator = String.format("Commit [%s]", calcCommitId(workSpec)); EventSubmitter eventSubmitter = workSpec.getEventSubmitterContext().create(); Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, errCorrelator); + ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, com.google.common.base.Optional.of(log)); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java index 33f60bd779d..fdeaf424870 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java @@ -24,6 +24,9 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.FileSystem; @@ -38,6 +41,8 @@ import com.google.common.io.Closer; import com.tdunning.math.stats.TDigest; import io.temporal.failure.ApplicationFailure; +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityExecutionContext; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -66,6 +71,7 @@ import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; import org.apache.gobblin.temporal.workflows.metrics.EventTimer; import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer; +import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.writer.initializer.WriterInitializer; import org.apache.gobblin.writer.initializer.WriterInitializerFactory; @@ -122,11 +128,19 @@ private static class WorkUnitsWithInsights { @Override public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) { + ActivityExecutionContext activityExecutionContext = Activity.getExecutionContext(); + ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("GenerateWorkUnitsActivityHeartBeatExecutor"))); // TODO: decide whether to acquire a job lock (as MR did)! // TODO: provide for job cancellation (unless handling at the temporal-level of parent workflows)! JobState jobState = new JobState(jobProps); log.info("Created jobState: {}", jobState.toJsonString(true)); + int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); + heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"), + heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES); + Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState); log.info("Using work dir root path for job '{}' - '{}'", jobState.getJobId(), workDirRoot); @@ -177,6 +191,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi } finally { EventSubmitter eventSubmitter = eventSubmitterContext.create(); Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, jobState.getJobId()); + ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, com.google.common.base.Optional.of(log)); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java index a6753245b7e..9450bc7bcb9 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -28,6 +31,8 @@ import org.apache.hadoop.fs.Path; import com.google.common.collect.Lists; +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityExecutionContext; import lombok.extern.slf4j.Slf4j; @@ -56,6 +61,7 @@ import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.JobLauncherUtils; @@ -68,6 +74,10 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit { @Override public int processWorkUnit(WorkUnitClaimCheck wu) { + ActivityExecutionContext activityExecutionContext = Activity.getExecutionContext(); + ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor"))); AutomaticTroubleshooter troubleshooter = null; EventSubmitter eventSubmitter = wu.getEventSubmitterContext().create(); String correlator = String.format("(M)WU [%s]", wu.getCorrelator()); @@ -75,6 +85,9 @@ public int processWorkUnit(WorkUnitClaimCheck wu) { List workUnits = loadFlattenedWorkUnits(wu, fs); log.info("{} - loaded; found {} workUnits", correlator, workUnits.size()); JobState jobState = Help.loadJobState(wu, fs); + int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState); + heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity"), + heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES); troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties()); troubleshooter.start(); return execute(workUnits, wu, jobState, fs, troubleshooter.getIssueRepository()); @@ -82,6 +95,7 @@ public int processWorkUnit(WorkUnitClaimCheck wu) { throw new RuntimeException(e); } finally { Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, correlator); + ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, com.google.common.base.Optional.of(log)); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java index 709a9cf935f..954269948fe 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java @@ -112,7 +112,7 @@ public void submitJob(List workunits) { Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec))); ProcessWorkUnitsWorkflow workflow = this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options); - workflow.process(wuSpec); + workflow.process(wuSpec, jobProps); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java index e6066019562..be31d3ec4f4 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java @@ -46,6 +46,7 @@ import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.source.Source; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.util.JobLauncherUtils; @@ -238,4 +239,9 @@ public static SharedResourcesBroker getSharedResourcesBroker( GobblinScopeTypes.GLOBAL.defaultScopeInstance()); return globalBroker.newSubscopedBuilder(new JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build(); } + + public static int getHeartBeatInterval(JobState jobState) { + return jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES); + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java index dfa207f66c6..3a05e60f013 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java @@ -18,7 +18,6 @@ package org.apache.gobblin.temporal.ddm.work; import java.net.URI; -import java.util.Optional; import org.apache.hadoop.fs.Path; @@ -36,8 +35,6 @@ import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt; import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; -import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; -import org.apache.gobblin.temporal.util.nesting.work.Workload; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; @@ -75,7 +72,7 @@ public Path getJobStatePath() { return new Path(new Path(workUnitsDir).getParent(), AbstractJobLauncher.JOB_STATE_FILE_NAME); } - /** Configuration for {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr, Workload, int, int, int, Optional)}*/ + /** Configuration for {@link org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput} */ @Data @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization @NoArgsConstructor // IMPORTANT: for jackson (de)serialization diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java index c5368285202..e47c0a9b15b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java @@ -17,6 +17,8 @@ package org.apache.gobblin.temporal.ddm.workflow; +import java.util.Properties; + import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; @@ -35,5 +37,5 @@ public interface CommitStepWorkflow { * @return number of workunits committed */ @WorkflowMethod - CommitStats commit(WUProcessingSpec workSpec); + CommitStats commit(WUProcessingSpec workSpec, Properties props); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java index a6018d41f17..8adfd34185d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java @@ -17,6 +17,8 @@ package org.apache.gobblin.temporal.ddm.workflow; +import java.util.Properties; + import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; @@ -30,5 +32,5 @@ public interface ProcessWorkUnitsWorkflow { /** @return the number of {@link WorkUnit}s cumulatively processed successfully */ @WorkflowMethod - CommitStats process(WUProcessingSpec wuSpec); + CommitStats process(WUProcessingSpec workSpec, Properties props); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java index 0f289018838..b4406589048 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -17,13 +17,11 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.Workflow; @@ -31,6 +29,7 @@ import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.DatasetTaskSummary; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.CommitActivity; import org.apache.gobblin.temporal.ddm.work.CommitStats; import org.apache.gobblin.temporal.ddm.work.DatasetStats; @@ -42,22 +41,9 @@ @Slf4j public class CommitStepWorkflowImpl implements CommitStepWorkflow { - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofHours(3)) // TODO: make configurable... also add activity heartbeats - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS); - @Override - public CommitStats commit(WUProcessingSpec workSpec) { + public CommitStats commit(WUProcessingSpec workSpec, final Properties props) { + final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ActivityType.COMMIT.buildActivityOptions(props)); CommitStats commitGobblinStats = activityStub.commit(workSpec); if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext()); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 15184c6ccc9..661847bdce5 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -39,9 +39,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import io.temporal.activity.ActivityOptions; import io.temporal.api.enums.v1.ParentClosePolicy; -import io.temporal.common.RetryOptions; import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; @@ -50,6 +48,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity; import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits; import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits; @@ -72,6 +71,7 @@ import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; import org.apache.gobblin.temporal.workflows.metrics.EventTimer; import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; @@ -81,49 +81,6 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow { public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits"; - public static final Duration genWUsStartToCloseTimeout = Duration.ofHours(2); // TODO: make configurable... also add activity heartbeats - - private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(genWUsStartToCloseTimeout) - .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS) - .build(); - - private final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS); - - private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofMinutes(5)) - .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS) - .build(); - private final RecommendScalingForWorkUnits recommendScalingStub = Workflow.newActivityStub(RecommendScalingForWorkUnits.class, - RECOMMEND_SCALING_ACTIVITY_OPTS); - - private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofMinutes(10)) - .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS) - .build(); - private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub(DeleteWorkDirsActivity.class, DELETE_WORK_DIRS_ACTIVITY_OPTS); - @Override public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) { TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); @@ -131,8 +88,13 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event EventTimer jobSuccessTimer = timerFactory.createJobTimer(); Optional optGenerateWorkUnitResult = Optional.empty(); WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext); + // Filtering only temporal job properties to pass to child workflows to avoid passing unnecessary properties + final Properties temporalJobProps = PropertiesUtils.extractPropertiesWithPrefix(jobProps, + com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX)); boolean isSuccessful = false; try (Closer closer = Closer.create()) { + final GenerateWorkUnits genWUsActivityStub = Workflow.newActivityStub(GenerateWorkUnits.class, + ActivityType.GENERATE_WORKUNITS.buildActivityOptions(temporalJobProps)); GenerateWorkUnitsResult generateWorkUnitResult = genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext); optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult); WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResult.getWorkUnitsSizeSummary(); @@ -141,6 +103,8 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event CommitStats commitStats = CommitStats.createEmpty(); if (numWUsGenerated > 0) { TimeBudget timeBudget = calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps); + final RecommendScalingForWorkUnits recommendScalingStub = Workflow.newActivityStub(RecommendScalingForWorkUnits.class, + ActivityType.RECOMMEND_SCALING.buildActivityOptions(temporalJobProps)); List scalingDirectives = recommendScalingStub.recommendScaling(wuSizeSummary, generateWorkUnitResult.getSourceClass(), timeBudget, jobProps); log.info("Recommended scaling to process WUs within {}: {}", timeBudget, scalingDirectives); @@ -156,7 +120,7 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event } ProcessWorkUnitsWorkflow processWUsWorkflow = createProcessWorkUnitsWorkflow(jobProps); - commitStats = processWUsWorkflow.process(wuSpec); + commitStats = processWUsWorkflow.process(wuSpec, temporalJobProps); numWUsCommitted = commitStats.getNumCommittedWorkUnits(); } jobSuccessTimer.stop(); @@ -270,6 +234,11 @@ private void cleanupWorkDirs(WUProcessingSpec workSpec, EventSubmitterContext ev return; } + final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub( + DeleteWorkDirsActivity.class, + ActivityType.DELETE_WORK_DIRS.buildActivityOptions(jobState.getProperties()) + ); + DirDeletionResult dirDeletionResult = deleteWorkDirsActivityStub.delete(workSpec, eventSubmitterContext, calculateWorkDirsToDelete(jobState.getJobId(), directoriesToClean)); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java index 96591e65475..6f22e3dd8bf 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java @@ -17,13 +17,13 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; @@ -31,25 +31,11 @@ /** {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow} for {@link ProcessWorkUnit} */ public class NestingExecOfProcessWorkUnitWorkflowImpl extends AbstractNestingExecWorkflowImpl { - public static final Duration processWorkUnitStartToCloseTimeout = Duration.ofHours(3); // TODO: make configurable... also add activity heartbeats - - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(3)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(4) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(processWorkUnitStartToCloseTimeout) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final ProcessWorkUnit activityStub = Workflow.newActivityStub(ProcessWorkUnit.class, ACTIVITY_OPTS); @Override - protected Promise launchAsyncActivity(final WorkUnitClaimCheck wu) { - return Async.function(activityStub::processWorkUnit, wu); + protected Promise launchAsyncActivity(final WorkUnitClaimCheck wu, final Properties props) { + final ProcessWorkUnit processWorkUnitStub = Workflow.newActivityStub(ProcessWorkUnit.class, + ActivityType.PROCESS_WORKUNIT.buildActivityOptions(props)); + return Async.function(processWorkUnitStub::processWorkUnit, wu); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 7b3f171967a..00def8a2ee2 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -18,10 +18,10 @@ import java.util.Map; import java.util.Optional; +import java.util.Properties; import lombok.extern.slf4j.Slf4j; -import com.google.common.io.Closer; import com.typesafe.config.ConfigFactory; import io.temporal.api.enums.v1.ParentClosePolicy; @@ -33,6 +33,7 @@ import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils; import org.apache.gobblin.temporal.ddm.work.CommitStats; import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload; +import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.work.assistance.Help; @@ -45,7 +46,6 @@ import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; import org.apache.gobblin.temporal.workflows.metrics.EventTimer; import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer; -import org.apache.gobblin.runtime.JobState; @Slf4j @@ -54,37 +54,30 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow"; @Override - public CommitStats process(WUProcessingSpec workSpec) { + public CommitStats process(WUProcessingSpec workSpec, final Properties props) { Optional timer = this.createOptJobEventTimer(workSpec); - CommitStats result = performWork(workSpec); + CommitStats result = performWork(workSpec, props); timer.ifPresent(EventTimer::stop); return result; } - private CommitStats performWork(WUProcessingSpec workSpec) { + private CommitStats performWork(WUProcessingSpec workSpec, final Properties props) { Workload workload = createWorkload(workSpec); - Map searchAttributes; - JobState jobState; - try (Closer closer = Closer.create()) { - jobState = Help.loadJobState(workSpec, closer.register(Help.loadFileSystem(workSpec))); - } catch (Exception e) { - log.error("Error loading jobState", e); - throw new RuntimeException("Error loading jobState", e); - } - searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); - + Map searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(props); NestingExecWorkflow processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes); Optional workunitsProcessed = Optional.empty(); try { - workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0, + NestingExecWorkloadInput + performWorkloadInput = new NestingExecWorkloadInput<>(WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), - Optional.empty())); + Optional.empty(), props); + workunitsProcessed = Optional.of(processingWorkflow.performWorkload(performWorkloadInput)); } catch (Exception e) { log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); try { - performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure + performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed, props);// Attempt partial commit before surfacing the failure } catch (Exception commitException) { // Combine current and commit exception messages for a more complete context String combinedMessage = String.format( @@ -100,18 +93,18 @@ private CommitStats performWork(WUProcessingSpec workSpec) { } throw e;// Re-throw after any partial commit, to fail the parent workflow in case commitWorkflow didn't flow (unlikely) } - return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed); + return performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed, props); } private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSpec, - Map searchAttributes, Optional workunitsProcessed) { + Map searchAttributes, Optional workunitsProcessed, Properties props) { // we are only inhibiting commit when workunitsProcessed is actually known to be zero if (workunitsProcessed.filter(n -> n == 0).isPresent()) { log.error("No work units processed, so no commit attempted."); return CommitStats.createEmpty(); } CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes); - CommitStats result = commitWorkflow.commit(workSpec); + CommitStats result = commitWorkflow.commit(workSpec, props); if (result.getNumCommittedWorkUnits() == 0) { log.warn("No work units committed at the job level. They could have been committed at the task level."); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java index 86f1d4a0927..5315f464f38 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java @@ -38,6 +38,7 @@ import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler; import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; import org.apache.gobblin.temporal.loadgen.work.SimpleGeneratedWorkload; +import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput; import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; import org.apache.gobblin.temporal.util.nesting.work.Workload; import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; @@ -86,6 +87,8 @@ public void submitJob(List workunits) { // WARNING: although type param must agree w/ that of `workload`, it's entirely unverified by type checker! // ...and more to the point, mismatch would occur at runtime (`performWorkload` on the workflow type given to the stub)! NestingExecWorkflow workflow = this.client.newWorkflowStub(NestingExecWorkflow.class, options); - workflow.performWorkload(WorkflowAddr.ROOT, workload, 0, maxBranchesPerTree, maxSubTreesPerTree, Optional.empty()); + NestingExecWorkloadInput performWorkloadInput = new NestingExecWorkloadInput<>(WorkflowAddr.ROOT, + workload, 0, maxBranchesPerTree, maxSubTreesPerTree, Optional.empty(), new Properties()); + workflow.performWorkload(performWorkloadInput); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java index 4346eecfdfa..5355d9442d4 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/workflow/impl/NestingExecOfIllustrationItemActivityWorkflowImpl.java @@ -17,12 +17,13 @@ package org.apache.gobblin.temporal.loadgen.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; +import java.util.Properties; + import io.temporal.workflow.Async; import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import java.time.Duration; + +import org.apache.gobblin.temporal.ddm.activity.ActivityType; import org.apache.gobblin.temporal.loadgen.activity.IllustrationItemActivity; import org.apache.gobblin.temporal.loadgen.work.IllustrationItem; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; @@ -32,24 +33,10 @@ public class NestingExecOfIllustrationItemActivityWorkflowImpl extends AbstractNestingExecWorkflowImpl { - // RetryOptions specify how to automatically handle retries when Activities fail. - private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() - .setInitialInterval(Duration.ofSeconds(1)) - .setMaximumInterval(Duration.ofSeconds(100)) - .setBackoffCoefficient(2) - .setMaximumAttempts(3) - .build(); - - private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofSeconds(10)) - .setRetryOptions(ACTIVITY_RETRY_OPTS) - .build(); - - private final IllustrationItemActivity activityStub = - Workflow.newActivityStub(IllustrationItemActivity.class, ACTIVITY_OPTS); - @Override - protected Promise launchAsyncActivity(final IllustrationItem item) { + protected Promise launchAsyncActivity(final IllustrationItem item, final Properties props) { + final IllustrationItemActivity activityStub = + Workflow.newActivityStub(IllustrationItemActivity.class,ActivityType.DEFAULT_ACTIVITY.buildActivityOptions(props)); return Async.function(activityStub::handleItem, item); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java new file mode 100644 index 00000000000..4b2f3ff50e5 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/NestingExecWorkloadInput.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.work; + +import java.util.Optional; +import java.util.Properties; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.Setter; + + +/** A wrapper class to be used as function param for {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(NestingExecWorkloadInput)}*/ +@Data +@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@AllArgsConstructor +public class NestingExecWorkloadInput { + private WorkflowAddr addr; + private Workload workload; + private int startIndex; + private int maxBranchesPerTree; + private int maxSubTreesPerTree; + private Optional maxSubTreesForCurrentTreeOverride; + private Properties props; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 92ef6e1af9f..1863724a1fa 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -35,6 +36,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils; +import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput; import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; import org.apache.gobblin.temporal.util.nesting.work.Workload; @@ -46,14 +48,18 @@ public abstract class AbstractNestingExecWorkflowImpl workload, - final int startIndex, - final int maxBranchesPerTree, - final int maxSubTreesPerTree, - final Optional maxSubTreesForCurrentTreeOverride - ) { + public int performWorkload(NestingExecWorkloadInput performWorkloadInput) { + return performWorkloadInternal(performWorkloadInput); + } + + private int performWorkloadInternal(NestingExecWorkloadInput performWorkloadInput) { + final WorkflowAddr addr = performWorkloadInput.getAddr(); + final Workload workload = performWorkloadInput.getWorkload(); + final int startIndex = performWorkloadInput.getStartIndex(); + final int maxBranchesPerTree = performWorkloadInput.getMaxBranchesPerTree(); + final int maxSubTreesPerTree = performWorkloadInput.getMaxSubTreesPerTree(); + final Optional maxSubTreesForCurrentTreeOverride = performWorkloadInput.getMaxSubTreesForCurrentTreeOverride(); + final Properties props = performWorkloadInput.getProps(); final int maxSubTreesForCurrent = maxSubTreesForCurrentTreeOverride.orElse(maxSubTreesPerTree); final int maxLeaves = maxBranchesPerTree - maxSubTreesForCurrent; final Optional> optSpan = workload.getSpan(startIndex, maxLeaves); @@ -65,7 +71,7 @@ public int performWorkload( final Workload.WorkSpan workSpan = optSpan.get(); final Iterable iterable = () -> workSpan; final List> childActivities = StreamSupport.stream(iterable.spliterator(), false) - .map(t -> launchAsyncActivity(t)) + .map(t -> launchAsyncActivity(t, props)) .collect(Collectors.toList()); final List> childSubTrees = new ArrayList<>(); if (workSpan.getNumElems() == maxLeaves) { // received as many as requested (did not stop short) @@ -84,9 +90,11 @@ public int performWorkload( if (numDirectLeavesChildMayHave > 0) { Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave)); } - childSubTrees.add( - Async.function(child::performWorkload, childAddr, workload, childStartIndex, maxBranchesPerTree, - maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree))); + NestingExecWorkloadInput childInput = new NestingExecWorkloadInput<>( + childAddr, workload, childStartIndex, maxBranchesPerTree, + maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree), props + ); + childSubTrees.add(Async.function(child::performWorkload, childInput)); ++subTreeId; } } @@ -103,7 +111,7 @@ public int performWorkload( } /** Factory for invoking the specific activity by providing it args via {@link Async::function} */ - protected abstract Promise launchAsyncActivity(WORK_ITEM task); + protected abstract Promise launchAsyncActivity(WORK_ITEM task, Properties props); protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr childAddr) { // preserve the current workflow ID of this parent, but add the (hierarchical) address extension specific to each child diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java index 3a6661d0907..e0c3ebfa9dd 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java @@ -17,12 +17,10 @@ package org.apache.gobblin.temporal.util.nesting.workflow; -import java.util.Optional; - import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; -import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; +import org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput; import org.apache.gobblin.temporal.util.nesting.work.Workload; @@ -44,12 +42,5 @@ public interface NestingExecWorkflow { /** @return the number of workload elements processed cumulatively by this Workflow and its children */ @WorkflowMethod - int performWorkload( - WorkflowAddr addr, - Workload workload, - int startIndex, - int maxBranchesPerTree, - int maxSubTreesPerTree, - Optional maxSubTreesForCurrentTreeOverride - ); + int performWorkload(NestingExecWorkloadInput performWorkloadInput); } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java new file mode 100644 index 00000000000..cdab706d869 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import io.temporal.activity.ActivityOptions; + +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + + +/** Tests for {@link ActivityType} */ +public class ActivityTypeTest { + + private Properties props; + private final List activityTypes = Arrays.asList(ActivityType.values()); + + @BeforeMethod + public void setUp() { + props = new Properties(); + } + + @Test + public void testDefaultValuesForTimeouts() { + activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> { + Assert.assertEquals(activityOptions.getStartToCloseTimeout(), + Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES)); + Assert.assertEquals(activityOptions.getHeartbeatTimeout(), + Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES)); + Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), + Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS)); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), + Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS)); + Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, 0.01); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS); + }); + } + + @DataProvider(name = "activityTypesWithStartToCloseTimeout") + public Object[][] activityTypesWithStartToCloseTimeout() { + return new Object[][] { + {ActivityType.GENERATE_WORKUNITS, 333}, + {ActivityType.RECOMMEND_SCALING, 111}, + {ActivityType.DELETE_WORK_DIRS, 222}, + {ActivityType.PROCESS_WORKUNIT, 555}, + {ActivityType.COMMIT, 444}, + {ActivityType.DEFAULT_ACTIVITY, 1} + }; + } + + @Test(dataProvider = "activityTypesWithStartToCloseTimeout") + public void testStartToCloseTimeout(ActivityType activityType, int expectedTimeout) { + props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), Integer.toString(expectedTimeout)); + Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(), Duration.ofMinutes(expectedTimeout)); + } + + @Test + public void testHeartBeatTimeout() { + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, "14"); + activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> { + Assert.assertEquals(activityOptions.getHeartbeatTimeout(), Duration.ofMinutes(14)); + }); + } + + @Test + public void testRetryOptions() { + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS, "115"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS, "5550"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, "7.0"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, "21"); + + activityTypes.stream().map(activityType -> activityType.buildActivityOptions(props)).forEach(activityOptions -> { + Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), Duration.ofSeconds(115)); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), Duration.ofSeconds(5550)); + Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 7.0, 0.01); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21); + }); + } + + @Test(dataProvider = "activityTypesWithStartToCloseTimeout") + public void testBuildActivityOptions(ActivityType activityType, int expectedTimeout) { + props.setProperty(activityType.getStartToCloseTimeoutConfigKey(), Integer.toString(expectedTimeout)); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES, "144"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS, "115"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS, "5550"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT, "7.0"); + props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, "21"); + + ActivityOptions activityOptions = activityType.buildActivityOptions(props); + + Assert.assertEquals(activityOptions.getStartToCloseTimeout(), Duration.ofMinutes(expectedTimeout)); + Assert.assertEquals(activityOptions.getHeartbeatTimeout(), Duration.ofMinutes(144)); + Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(), Duration.ofSeconds(115)); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(), Duration.ofSeconds(5550)); + Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(), 7.0, 0.01); + Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21); + } + +} diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java index 28f0daa93f3..db5518f54d5 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java @@ -92,6 +92,14 @@ public static long getPropAsLong(Properties properties, String key, long default return Long.parseLong(properties.getProperty(key, Long.toString(defaultValue))); } + public static double getPropAsDouble(Properties properties, String key, double defaultValue) { + try { + return Double.parseDouble(properties.getProperty(key, Double.toString(defaultValue))); + } catch (NullPointerException | NumberFormatException e) { + return defaultValue; + } + } + /** * Get the value of a comma separated property as a {@link List} of strings. * diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java index 42291f4caee..856067220db 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java @@ -101,4 +101,20 @@ public void testGetValuesAsList() { Assert.assertEqualsNoOrder(PropertiesUtils.getValuesAsList(properties, Optional.of("k")).toArray(), new String[]{"v1", "v2", "v2"}); } + + @Test + public void testGetPropAsDouble() { + Properties properties = new Properties(); + properties.put("k1", "1.0"); + properties.put("k2", "1"); + properties.put("k3", "1.00"); + properties.put("k4", ""); + + Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "key", 5.01), 5.01, 0.01); + Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k1", 2.02), 1.00, 0.01); + Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k2", 2.02), 1.00, 0.01); + Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k3", 2.02), 1.00, 0.01); + Assert.assertEquals(PropertiesUtils.getPropAsDouble(properties, "k4", 10.001), 10.001, 0.001); + } + }