From e584e5b49d24aaed398d082cf1e07bac8c13f453 Mon Sep 17 00:00:00 2001 From: Vivek Rai <43493515+Blazer-007@users.noreply.github.com> Date: Mon, 20 Jan 2025 09:52:45 +0530 Subject: [PATCH] [GOBBLIN-2182] Cleanup workdir in Gobblin-on-Temporal execution (#4085) * added cleanup for got controlled with a config --- .../GobblinTemporalConfigurationKeys.java | 2 ++ .../temporal/joblauncher/GobblinJobLauncher.java | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index e90e901a56f..0201cda0fb8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -43,6 +43,8 @@ public interface GobblinTemporalConfigurationKeys { String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg."; String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides"; + String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX + "work.dir.cleanup.enabled"; + String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true"; /** * Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java index ea2c2ce7c2e..13ecfd1ebc9 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java @@ -59,6 +59,8 @@ import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ParallelRunner; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; /** * An implementation of {@link JobLauncher} that launches a Gobblin job using the Temporal task framework. @@ -134,7 +136,11 @@ public void close() throws IOException { try { executeCancellation(); } finally { - super.close(); + try { + cleanupWorkingDirectory(); + } finally { + super.close(); + } } } @@ -277,6 +283,13 @@ protected void cleanupWorkingDirectory() throws IOException { GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir, this.jobContext.getJobId()); this.fs.delete(jobStateFilePath, false); } + + if (Boolean.parseBoolean(this.jobProps.getProperty(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED, + GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED))) { + Path workDirRootPath = JobStateUtils.getWorkDirRoot(this.jobContext.getJobState()); + log.info("Cleaning up work directory : {} for job : {}", workDirRootPath, this.jobContext.getJobId()); + this.fs.delete(workDirRootPath, true); + } } }