From 003590e7d58691505d8fb73988db35ae636873f1 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Tue, 10 Dec 2024 21:16:22 -0800 Subject: [PATCH] [GOBBLIN-2179] Enhance GoT observability with `WorkUnitsSizeSummary` and `WorkUnitSizeInfo` (#4082) --- .../AutoTroubleshooterLogAppender.java | 2 +- .../gobblin/runtime/troubleshooter/Issue.java | 6 +- gobblin-temporal/build.gradle | 1 + .../ddm/activity/GenerateWorkUnits.java | 5 + .../activity/impl/GenerateWorkUnitsImpl.java | 112 ++++++++++++-- .../temporal/ddm/util/JobStateUtils.java | 18 ++- ...FsDirBackedWorkUnitClaimCheckWorkload.java | 29 +++- .../ddm/work/GenerateWorkUnitsResult.java | 3 + .../work/PriorJobStateWUProcessingSpec.java | 3 +- .../temporal/ddm/work/WorkUnitClaimCheck.java | 6 +- .../ddm/work/WorkUnitsSizeSummary.java | 50 +++++++ .../impl/ExecuteGobblinWorkflowImpl.java | 34 +++-- .../impl/ProcessWorkUnitsWorkflowImpl.java | 4 +- .../dynamic/ScalingDirectiveSource.java | 2 +- .../temporal/dynamic/WorkforcePlan.java | 2 +- .../workflows/metrics/TemporalEventTimer.java | 6 +- .../impl/GenerateWorkUnitsImplTest.java | 95 +++++++++++- ...rBackedWorkUnitClaimCheckWorkloadTest.java | 109 ++++++++++++++ gobblin-utility/build.gradle | 1 + .../apache/gobblin/util/JobLauncherUtils.java | 19 ++- .../apache/gobblin/util/WorkUnitSizeInfo.java | 138 ++++++++++++++++++ .../gobblin/util/JobLauncherUtilsTest.java | 90 ++++++++++++ .../gobblin/util/WorkUnitSizeInfoTest.java | 104 +++++++++++++ gradle/scripts/dependencyDefinitions.gradle | 1 + 24 files changed, 786 insertions(+), 54 deletions(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java create mode 100644 gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java create mode 100644 gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java create mode 100644 gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java diff --git a/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java b/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java index bb562bbe81c..c3304ccef0b 100644 --- a/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java +++ b/gobblin-modules/gobblin-troubleshooter/src/main/java/org/apache/gobblin/troubleshooter/AutoTroubleshooterLogAppender.java @@ -44,7 +44,7 @@ /** - * Collects messages from log4j and converts them into issues that are used in {@link AutomaticTroubleshooter}. + * Collects messages from log4j and converts them into {@link Issue}s used by the {@link org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter}. */ @Slf4j @ThreadSafe diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java index 7f694b20322..c726931e953 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/Issue.java @@ -50,7 +50,7 @@ public class Issue { * * It can be used for making programmatic decisions on how to handle and recover from this issue. * - * The code length should be less than {@link Issue.MAX_ISSUE_CODE_LENGTH} + * The code length should be less than {@link Issue#MAX_ISSUE_CODE_LENGTH} * */ private final String code; @@ -71,14 +71,14 @@ public class Issue { * * This is a full name of the class that logged the error or generated the issue. * - * The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH} + * The class name length should be less than {@link Issue#MAX_CLASSNAME_LENGTH} * */ private final String sourceClass; /** * If the issue was generated from an exception, then a full exception class name should be stored here. * - * The class name length should be less than {@link Issue.MAX_CLASSNAME_LENGTH} + * The class name length should be less than {@link Issue#MAX_CLASSNAME_LENGTH} */ private final String exceptionClass; diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle index 1832ac909de..fa34245e914 100644 --- a/gobblin-temporal/build.gradle +++ b/gobblin-temporal/build.gradle @@ -62,6 +62,7 @@ dependencies { compile (externalDependency.helix) { exclude group: 'io.dropwizard.metrics', module: 'metrics-core' } + compile externalDependency.tdigest compile externalDependency."temporal-sdk" testCompile project(path: ':gobblin-cluster', configuration: 'tests') testCompile project(":gobblin-example") diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java index 862b46c40fe..a43f8039149 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/GenerateWorkUnits.java @@ -30,6 +30,11 @@ /** Activity for generating {@link WorkUnit}s and persisting them to the {@link org.apache.hadoop.fs.FileSystem}, per "job properties" */ @ActivityInterface public interface GenerateWorkUnits { + + public static final String NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = GenerateWorkUnits.class.getName() + ".numWorkUnitsSizeInfoQuantiles"; + public static final int DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES = 10; + + /** @return the number of {@link WorkUnit}s generated and persisted */ @ActivityMethod GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java index 8344bae6f98..4996c16c1cd 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java @@ -23,17 +23,23 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import com.google.api.client.util.Lists; -import com.google.common.io.Closer; - import io.temporal.failure.ApplicationFailure; +import lombok.Data; import lombok.extern.slf4j.Slf4j; +import com.google.api.client.util.Lists; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.io.Closer; +import com.tdunning.math.stats.TDigest; + import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.converter.initializer.ConverterInitializerFactory; import org.apache.gobblin.destination.DestinationDatasetHandlerService; import org.apache.gobblin.metrics.event.EventSubmitter; @@ -41,6 +47,7 @@ import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory; +import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.source.Source; import org.apache.gobblin.source.WorkUnitStreamSource; import org.apache.gobblin.source.workunit.BasicWorkUnitStream; @@ -50,6 +57,7 @@ import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; import org.apache.gobblin.writer.initializer.WriterInitializerFactory; @@ -58,6 +66,39 @@ @Slf4j public class GenerateWorkUnitsImpl implements GenerateWorkUnits { + /** [Internal, implementation class] Size sketch/digest of a collection of {@link MultiWorkUnit}s */ + @Data + @VisibleForTesting + protected static class WorkUnitsSizeDigest { + private final long totalSize; + /** a top-level work unit has no parent - a root */ + private final TDigest topLevelWorkUnitsSizeDigest; + /** a constituent work unit has no children - a leaf */ + private final TDigest constituentWorkUnitsSizeDigest; + + public WorkUnitsSizeSummary asSizeSummary(int numQuantiles) { + Preconditions.checkArgument(numQuantiles > 0, "numQuantiles must be > 0"); + final double quantilesWidth = 1.0 / numQuantiles; + + List topLevelQuantileValues = getQuantiles(topLevelWorkUnitsSizeDigest, numQuantiles); + List constituentQuantileValues = getQuantiles(constituentWorkUnitsSizeDigest, numQuantiles); + return new WorkUnitsSizeSummary( + totalSize, + topLevelWorkUnitsSizeDigest.size(), constituentWorkUnitsSizeDigest.size(), + numQuantiles, quantilesWidth, + topLevelQuantileValues, constituentQuantileValues); + } + + private static List getQuantiles(TDigest digest, int numQuantiles) { + List quantileMinSizes = Lists.newArrayList(); + for (int i = 1; i <= numQuantiles; i++) { + quantileMinSizes.add(digest.quantile((i * 1.0) / numQuantiles)); + } + return quantileMinSizes; + } + } + + @Override public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) { // TODO: decide whether to acquire a job lock (as MR did)! @@ -80,12 +121,18 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi FileSystem fs = JobStateUtils.openFileSystem(jobState); fs.mkdirs(workDirRoot); - Set resourcesToCleanUp = new HashSet<>(); - List workUnits = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer, resourcesToCleanUp); + Set pathsToCleanUp = new HashSet<>(); + List workUnits = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer, pathsToCleanUp); + + int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState); + WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles); + log.info("Discovered WorkUnits: {}", wuSizeSummary); + JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs); - JobStateUtils.writeJobState(jobState, workDirRoot, fs); + JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete - return new GenerateWorkUnitsResult(jobState.getTaskCount(), resourcesToCleanUp); + String sourceClassName = JobStateUtils.getSourceClassName(jobState); + return new GenerateWorkUnitsResult(jobState.getTaskCount(), sourceClassName, wuSizeSummary, pathsToCleanUp); } catch (ReflectiveOperationException roe) { String errMsg = "Unable to construct a source for generating workunits for job " + jobState.getJobId(); log.error(errMsg, roe); @@ -101,7 +148,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi } protected List generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer, - Set resourcesToCleanUp) + Set pathsToCleanUp) throws ReflectiveOperationException { Source source = JobStateUtils.createSource(jobState); WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource @@ -127,7 +174,7 @@ protected List generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS DestinationDatasetHandlerService datasetHandlerService = closer.register( new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create())); WorkUnitStream handledWorkUnitStream = datasetHandlerService.executeHandlers(workUnitStream); - resourcesToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream)); + pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream)); // initialize writer and converter(s) // TODO: determine whether registration here is effective, or the lifecycle of this activity is too brief (as is likely!) closer.register(WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream)).initialize(); @@ -151,7 +198,7 @@ protected List generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS } protected static Set calculateWorkDirsToCleanup(WorkUnitStream workUnitStream) { - Set resourcesToCleanUp = new HashSet<>(); + Set workDirPaths = new HashSet<>(); // Validate every workunit if they have the temp dir props since some workunits may be commit steps Iterator workUnitIterator = workUnitStream.getWorkUnits(); while (workUnitIterator.hasNext()) { @@ -159,16 +206,16 @@ protected static Set calculateWorkDirsToCleanup(WorkUnitStream workUnitS if (workUnit.isMultiWorkUnit()) { List workUnitList = ((MultiWorkUnit) workUnit).getWorkUnits(); for (WorkUnit wu : workUnitList) { - resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu)); + // WARNING/TODO: NOT resilient to nested multi-workunits... should it be? + workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(wu)); } } else { - resourcesToCleanUp.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit)); + workDirPaths.addAll(collectTaskStagingAndOutputDirsFromWorkUnit(workUnit)); } } - return resourcesToCleanUp; + return workDirPaths; } - private static Set collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit workUnit) { Set resourcesToCleanUp = new HashSet<>(); if (workUnit.contains(ConfigurationKeys.WRITER_STAGING_DIR)) { @@ -183,4 +230,41 @@ private static Set collectTaskStagingAndOutputDirsFromWorkUnit(WorkUnit } return resourcesToCleanUp; } + + /** @return the {@link WorkUnitsSizeDigest} for `workUnits` */ + protected static WorkUnitsSizeDigest digestWorkUnitsSize(List workUnits) { + AtomicLong totalSize = new AtomicLong(0L); + TDigest topLevelWorkUnitsDigest = TDigest.createDigest(100); + TDigest constituentWorkUnitsDigest = TDigest.createDigest(100); + + Iterator workUnitIterator = workUnits.iterator(); + while (workUnitIterator.hasNext()) { + WorkUnit workUnit = workUnitIterator.next(); + if (workUnit.isMultiWorkUnit()) { + List subWorkUnitsList = ((MultiWorkUnit) workUnit).getWorkUnits(); + AtomicLong mwuAggSize = new AtomicLong(0L); + // WARNING/TODO: NOT resilient to nested multi-workunits... should it be? + subWorkUnitsList.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).forEach(wuSize -> { + constituentWorkUnitsDigest.add(wuSize); + mwuAggSize.addAndGet(wuSize); + }); + totalSize.addAndGet(mwuAggSize.get()); + topLevelWorkUnitsDigest.add(mwuAggSize.get()); + } else { + long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0); + totalSize.addAndGet(wuSize); + constituentWorkUnitsDigest.add(wuSize); + topLevelWorkUnitsDigest.add(wuSize); + } + } + + // TODO - decide whether helpful/necessary to `.compress()` + topLevelWorkUnitsDigest.compress(); + constituentWorkUnitsDigest.compress(); + return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, constituentWorkUnitsDigest); + } + + public static int getConfiguredNumSizeSummaryQuantiles(State state) { + return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES); + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java index 37d1f904775..52da7b5299b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java @@ -46,6 +46,7 @@ import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.source.Source; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.ParallelRunner; @@ -76,9 +77,14 @@ public static FileSystem openFileSystem(JobState jobState) throws IOException { return Help.loadFileSystemForUriForce(getFileSystemUri(jobState), jobState); } - /** @return a new instance of {@link Source} identified by {@link ConfigurationKeys#SOURCE_CLASS_KEY} */ + /** @return the FQ class name, presumed configured as {@link ConfigurationKeys#SOURCE_CLASS_KEY} */ + public static String getSourceClassName(JobState jobState) { + return jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY); + } + + /** @return a new instance of {@link Source}, identified by {@link ConfigurationKeys#SOURCE_CLASS_KEY} */ public static Source createSource(JobState jobState) throws ReflectiveOperationException { - Class sourceClass = Class.forName(jobState.getProp(ConfigurationKeys.SOURCE_CLASS_KEY)); + Class sourceClass = Class.forName(getSourceClassName(jobState)); log.info("Creating source: '{}'", sourceClass.getName()); Source source = new SourceDecorator<>( Source.class.cast(sourceClass.newInstance()), @@ -145,7 +151,10 @@ public static Path getTaskStateStorePath(JobState jobState, FileSystem fs) { return fs.makeQualified(jobOutputPath); } - /** write serialized {@link WorkUnit}s in parallel into files named after the jobID and task IDs */ + /** + * write serialized {@link WorkUnit}s in parallel into files named to tunnel {@link org.apache.gobblin.util.WorkUnitSizeInfo}. + * {@link EagerFsDirBackedWorkUnitClaimCheckWorkload} (and possibly others) may later recover such size info. + */ public static void writeWorkUnits(List workUnits, Path workDirRootPath, JobState jobState, FileSystem fs) throws IOException { String jobId = jobState.getJobId(); @@ -159,7 +168,8 @@ public static void writeWorkUnits(List workUnits, Path workDirRootPath JobLauncherUtils.WorkUnitPathCalculator pathCalculator = new JobLauncherUtils.WorkUnitPathCalculator(); int i = 0; for (WorkUnit workUnit : workUnits) { - Path workUnitFile = pathCalculator.calcNextPath(workUnit, jobId, targetDirPath); + // tunnel each WU's size info via its filename, for `EagerFsDirBackedWorkUnitClaimCheckWorkload#extractTunneledWorkUnitSizeInfo` + Path workUnitFile = pathCalculator.calcNextPathWithTunneledSizeInfo(workUnit, jobId, targetDirPath); if (i++ == 0) { log.info("Writing work unit file [first of {}]: '{}'", workUnits.size(), workUnitFile); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java index ac75971d2b2..24f6363093a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkload.java @@ -19,12 +19,19 @@ import java.net.URI; import java.util.Comparator; +import java.util.Optional; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; +import org.apache.gobblin.util.Id; +import org.apache.gobblin.util.WorkUnitSizeInfo; /** @@ -33,6 +40,7 @@ */ @lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization @lombok.ToString(callSuper = true) +@Slf4j public class EagerFsDirBackedWorkUnitClaimCheckWorkload extends AbstractEagerFsDirBackedWorkload { private EventSubmitterContext eventSubmitterContext; @@ -43,8 +51,9 @@ public EagerFsDirBackedWorkUnitClaimCheckWorkload(URI fileSystemUri, String hdfs @Override protected WorkUnitClaimCheck fromFileStatus(FileStatus fileStatus) { - // begin by setting all correlators to empty - return new WorkUnitClaimCheck("", this.getFileSystemUri(), fileStatus.getPath().toString(), this.eventSubmitterContext); + // begin by setting all correlators to empty string - later we'll `acknowledgeOrdering()` + Path filePath = fileStatus.getPath(); + return new WorkUnitClaimCheck("", this.getFileSystemUri(), filePath.toString(), extractTunneledWorkUnitSizeInfo(filePath), this.eventSubmitterContext); } @Override @@ -58,4 +67,20 @@ protected void acknowledgeOrdering(int index, WorkUnitClaimCheck item) { // later, after the post-total-ordering indices are know, use each item's index as its correlator item.setCorrelator(Integer.toString(index)); } + + /** + * @return the {@link WorkUnitSizeInfo}, when encoded in the filename; otherwise {@link WorkUnitSizeInfo#empty()} when no size info about {@link WorkUnit} + * @see org.apache.gobblin.util.JobLauncherUtils.WorkUnitPathCalculator#calcNextPathWithTunneledSizeInfo(WorkUnit, String, Path) + */ + protected static WorkUnitSizeInfo extractTunneledWorkUnitSizeInfo(Path filePath) { + String fileName = filePath.getName(); + Optional optSizeInfo = Optional.empty(); + try { + String maybeEncodedSizeInfo = Id.parse(fileName.substring(0, fileName.lastIndexOf('.'))).getName(); // strip extension + optSizeInfo = WorkUnitSizeInfo.decode(maybeEncodedSizeInfo); + } catch (Exception e) { // log, but swallow any `Id.parse` error + log.warn("Filename NOT `Id.parse`able: '" + filePath + "' - " + e.getMessage()); + } + return optSizeInfo.orElse(WorkUnitSizeInfo.empty()); + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java index 5f798055e2f..f30998ae857 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java @@ -33,7 +33,10 @@ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor public class GenerateWorkUnitsResult { + // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" @NonNull private int generatedWuCount; + @NonNull private String sourceClass; + @NonNull private WorkUnitsSizeSummary workUnitsSizeSummary; // Resources that the Temporal Job Launcher should clean up for Gobblin temporary work directory paths in writers @NonNull private Set workDirPathsToDelete; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java index ff3c9eb6f20..02ba87b3fae 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/PriorJobStateWUProcessingSpec.java @@ -48,8 +48,7 @@ @EqualsAndHashCode(callSuper = true) // to prevent findbugs warning - "equals method overrides equals in superclass and may not be symmetric" @NoArgsConstructor // IMPORTANT: for jackson (de)serialization public class PriorJobStateWUProcessingSpec extends WUProcessingSpec { - @NonNull - private List> tags = new ArrayList<>(); + @NonNull private List> tags = new ArrayList<>(); @NonNull private String metricsSuffix = GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX; public PriorJobStateWUProcessingSpec(URI fileSystemUri, String workUnitsDir, EventSubmitterContext eventSubmitterContext) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java index 5cb2064b116..e454c9ceef5 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java @@ -21,18 +21,19 @@ import org.apache.hadoop.fs.Path; -import com.fasterxml.jackson.annotation.JsonIgnore; - import lombok.Data; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnore; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt; import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; +import org.apache.gobblin.util.WorkUnitSizeInfo; /** @@ -47,6 +48,7 @@ public class WorkUnitClaimCheck implements FileSystemApt, FileSystemJobStateful @NonNull private String correlator; @NonNull private URI fileSystemUri; @NonNull private String workUnitPath; + @NonNull private WorkUnitSizeInfo workUnitSizeInfo; @NonNull private EventSubmitterContext eventSubmitterContext; @JsonIgnore // (because no-arg method resembles 'java bean property') diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java new file mode 100644 index 00000000000..3ea426c284b --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import java.util.List; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +import org.apache.gobblin.source.workunit.MultiWorkUnit; +import org.apache.gobblin.source.workunit.WorkUnit; + + +/** + * Total size, counts, and size distributions for a collection of {@link MultiWorkUnit}s, both with regard to top-level (possibly multi) {@link WorkUnit}s + * and individual constituent (purely {@link WorkUnit}s), where: + * * a top-level work unit is one with no parent - a root + * * a constituent work unit is one with no children - a leaf + * @see org.apache.gobblin.util.WorkUnitSizeInfo + */ +@Data +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class WorkUnitsSizeSummary { + // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" + @NonNull private long totalSize; + @NonNull private long topLevelWorkUnitsCount; + @NonNull private long constituentWorkUnitsCount; + @NonNull private int quantilesCount; + @NonNull private double quantilesWidth; + @NonNull private List topLevelQuantilesMinSizes; + @NonNull private List constituentQuantilesMinSizes; +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 2aa2a7e6495..1d5a63b7366 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -49,6 +49,7 @@ import org.apache.gobblin.temporal.ddm.work.DirDeletionResult; import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats; import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow; @@ -97,14 +98,15 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow { @Override public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) { TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext); - timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); - EventTimer timer = timerFactory.createJobTimer(); + timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME` + EventTimer jobSuccessTimer = timerFactory.createJobTimer(); Optional generateWorkUnitResultsOpt = Optional.empty(); WUProcessingSpec wuSpec = createProcessingSpec(jobProps, eventSubmitterContext); boolean isSuccessful = false; try { generateWorkUnitResultsOpt = Optional.of(genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext)); - int numWUsGenerated = generateWorkUnitResultsOpt.get().getGeneratedWuCount(); + WorkUnitsSizeSummary wuSizeSummary = generateWorkUnitResultsOpt.get().getWorkUnitsSizeSummary(); + int numWUsGenerated = safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary); int numWUsCommitted = 0; CommitStats commitStats = CommitStats.createEmpty(); if (numWUsGenerated > 0) { @@ -112,18 +114,16 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event commitStats = processWUsWorkflow.process(wuSpec); numWUsCommitted = commitStats.getNumCommittedWorkUnits(); } - timer.stop(); + jobSuccessTimer.stop(); isSuccessful = true; return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, jobProps.getProperty(Help.USER_TO_PROXY_KEY), commitStats.getDatasetStats()); } catch (Exception e) { // Emit a failed GobblinTrackingEvent to record job failures - timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); + timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit(); // update GaaS: `ExecutionStatus.FAILED`; `TimingEvent.JOB_END_TIME` throw ApplicationFailure.newNonRetryableFailureWithCause( String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)), - e.getClass().getName(), - e - ); + e.getClass().getName(), e); } finally { // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid flight try { @@ -138,9 +138,7 @@ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext event if (isSuccessful) { throw ApplicationFailure.newNonRetryableFailureWithCause( String.format("Failed cleaning Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)), - e.getClass().getName(), - e - ); + e.getClass().getName(), e); } log.error("Failed to cleanup work dirs", e); } @@ -210,4 +208,18 @@ protected static Set calculateWorkDirsToDelete(String jobId, Set } return resultSet; } + + /** + * Historical practice counted {@link org.apache.gobblin.source.workunit.WorkUnit}s with {@link int} (see e.g. {@link JobState#getTaskCount()}). + * Updated counting now uses {@link long}, although much code still presumes {@link int}. While we don't presently anticipate jobs exceeding 2 billion + * `WorkUnit`s, if it were ever to happen, this method will fail-fast to flag the need to address. + * @throws {@link IllegalArgumentException} if the count exceeds {@link Integer#MAX_VALUE} + */ + protected static int safelyCastNumConstituentWorkUnitsOrThrow(WorkUnitsSizeSummary wuSizeSummary) { + long n = wuSizeSummary.getConstituentWorkUnitsCount(); + if (n > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Too many constituent WorkUnits (" + n + ") - exceeds `Integer.MAX_VALUE`!"); + } + return (int) n; + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index ccd29794919..6402e473bf7 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -66,8 +66,8 @@ private CommitStats performWork(WUProcessingSpec workSpec) { try { jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec)); } catch (Exception e) { - log.error("Exception occured during loading jobState", e); - throw new RuntimeException("Exception occured during loading jobState", e); + log.error("Error loading jobState", e); + throw new RuntimeException("Error loading jobState", e); } searchAttributes = TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties()); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java index 1b0f79e78d6..764725dd903 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java @@ -22,7 +22,7 @@ /** An opaque source of {@link org.apache.gobblin.temporal.dynamic.ScalingDirective}s */ -public interface ScalingDirectiveSource extends Cloneable { +public interface ScalingDirectiveSource { /** @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer directives than previously returned */ List getScalingDirectives() throws IOException; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java index dde55556442..2a070e2ed9b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java @@ -167,7 +167,7 @@ WorkerProfile peepProfile(String profileName) throws WorkforceProfiles.UnknownPr return profiles.getOrThrow(profileName); } - /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it should NEVER {@link WorkforceProfiles.UnknownProfileException} */ + /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it should NEVER throw {@link WorkforceProfiles.UnknownProfileException} */ @VisibleForTesting WorkerProfile peepBaselineProfile() throws WorkforceProfiles.UnknownProfileException { return profiles.getOrThrow(WorkforceProfiles.BASELINE_NAME); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java index c9d9e940ec6..e1ac601986b 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java @@ -30,7 +30,7 @@ /** - * Boiler plate for tracking elapsed time of events that is compatible with {@link Workflow} + * Boilerplate for tracking elapsed time of events that is compatible with {@link Workflow} * by using activities to record time * * This class is very similar to {@link TimingEvent} but uses {@link Workflow} compatible APIs. It's possible to refactor @@ -106,9 +106,9 @@ public TemporalEventTimer create(String eventName) { * @return a timer that emits an event at the beginning of the job and a completion event ends at the end of the job */ public TemporalEventTimer createJobTimer() { - TemporalEventTimer startTimer = create(TimingEvent.LauncherTimings.JOB_START); + TemporalEventTimer startTimer = create(TimingEvent.LauncherTimings.JOB_START); // update GaaS: `ExecutionStatus.RUNNING` startTimer.stop(Instant.EPOCH); // Emit start job event containing a stub end time - // GaaS job status monitor tracks for SUCCEEDED events or FAILED events for job completion + // [upon `.stop()`] update GaaS: `ExecutionStatus.RUNNING`, `TimingEvent.JOB_END_TIME`: return create(TimingEvent.LauncherTimings.JOB_SUCCEEDED, startTimer.startTime); } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java index 86c5ac12de1..8c94783a7dd 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java @@ -24,10 +24,12 @@ import org.testng.Assert; import org.testng.annotations.Test; +import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.source.workunit.BasicWorkUnitStream; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.source.workunit.WorkUnitStream; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; public class GenerateWorkUnitsImplTest { @@ -36,7 +38,7 @@ public class GenerateWorkUnitsImplTest { public void testFetchesWorkDirsFromWorkUnits() { List workUnits = new ArrayList<>(); for (int i = 0; i < 5; i++) { - WorkUnit workUnit = new WorkUnit(); + WorkUnit workUnit = WorkUnit.createEmpty(); workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/" + i); workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/" + i); workUnit.setProp("qualitychecker.row.err.file", "/tmp/jobId/row-err/file"); @@ -54,9 +56,9 @@ public void testFetchesWorkDirsFromWorkUnits() { public void testFetchesWorkDirsFromMultiWorkUnits() { List workUnits = new ArrayList<>(); for (int i = 0; i < 5; i++) { - MultiWorkUnit multiWorkUnit = new MultiWorkUnit(); + MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty(); for (int j = 0; j < 3; j++) { - WorkUnit workUnit = new WorkUnit(); + WorkUnit workUnit = WorkUnit.createEmpty(); workUnit.setProp("writer.staging.dir", "/tmp/jobId/task-staging/"); workUnit.setProp("writer.output.dir", "/tmp/jobId/task-output/"); workUnit.setProp("qualitychecker.row.err.file", "/tmp/jobId/row-err/file"); @@ -76,9 +78,9 @@ public void testFetchesWorkDirsFromMultiWorkUnits() { public void testFetchesUniqueWorkDirsFromMultiWorkUnits() { List workUnits = new ArrayList<>(); for (int i = 0; i < 5; i++) { - MultiWorkUnit multiWorkUnit = new MultiWorkUnit(); + MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty(); for (int j = 0; j < 3; j++) { - WorkUnit workUnit = new WorkUnit(); + WorkUnit workUnit = WorkUnit.createEmpty(); // Each MWU will have its own staging and output dir workUnit.setProp("writer.staging.dir", "/tmp/jobId/" + i + "/task-staging/"); workUnit.setProp("writer.output.dir", "/tmp/jobId/" + i + "task-output/"); @@ -94,4 +96,87 @@ public void testFetchesUniqueWorkDirsFromMultiWorkUnits() { Set output = GenerateWorkUnitsImpl.calculateWorkDirsToCleanup(workUnitStream); Assert.assertEquals(output.size(), 11); } + + @Test + public void testDigestWorkUnitsSize() { + int numSingleWorkUnits = 5; + int numMultiWorkUnits = 15; + long singleWorkUnitSizeFactor = 100L; + long multiWorkUnitSizeFactor = 70L; + List workUnits = new ArrayList<>(); + + // configure size of non-multi-work-units (increments of `singleWorkUnitSizeFactor`, starting from 0) + for (int i = 0; i < numSingleWorkUnits; i++) { + workUnits.add(createWorkUnitOfSize(i * singleWorkUnitSizeFactor)); + } + + // configure size of multi-work-units, each containing between 1 and 4 sub-work-unit children + for (int i = 0; i < numMultiWorkUnits; i++) { + MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty(); + int subWorkUnitCount = 1 + (i % 4); // 1 to 4 + for (int j = 0; j < subWorkUnitCount; j++) { + multiWorkUnit.addWorkUnit(createWorkUnitOfSize((j + 1) * multiWorkUnitSizeFactor)); + } + workUnits.add(multiWorkUnit); + } + + // calc expectations + long expectedTotalSize = 0L; + int expectedNumTopLevelWorkUnits = numSingleWorkUnits + numMultiWorkUnits; + int expectedNumConstituentWorkUnits = numSingleWorkUnits; + for (int i = 0; i < numSingleWorkUnits; i++) { + expectedTotalSize += i * singleWorkUnitSizeFactor; + } + for (int i = 0; i < numMultiWorkUnits; i++) { + int numSubWorkUnitsThisMWU = 1 + (i % 4); + expectedNumConstituentWorkUnits += numSubWorkUnitsThisMWU; + for (int j = 0; j < numSubWorkUnitsThisMWU; j++) { + expectedTotalSize += (j + 1) * multiWorkUnitSizeFactor; + } + } + + GenerateWorkUnitsImpl.WorkUnitsSizeDigest wuSizeDigest = GenerateWorkUnitsImpl.digestWorkUnitsSize(workUnits); + + Assert.assertEquals(wuSizeDigest.getTotalSize(), expectedTotalSize); + Assert.assertEquals(wuSizeDigest.getTopLevelWorkUnitsSizeDigest().size(), expectedNumTopLevelWorkUnits); + Assert.assertEquals(wuSizeDigest.getConstituentWorkUnitsSizeDigest().size(), expectedNumConstituentWorkUnits); + + int numQuantilesDesired = expectedNumTopLevelWorkUnits; // for simpler math during quantile verification (below) + WorkUnitsSizeSummary wuSizeInfo = wuSizeDigest.asSizeSummary(numQuantilesDesired); + Assert.assertEquals(wuSizeInfo.getTotalSize(), expectedTotalSize); + Assert.assertEquals(wuSizeInfo.getTopLevelWorkUnitsCount(), expectedNumTopLevelWorkUnits); + Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsCount(), expectedNumConstituentWorkUnits); + Assert.assertEquals(wuSizeInfo.getQuantilesCount(), numQuantilesDesired); + Assert.assertEquals(wuSizeInfo.getQuantilesWidth(), 1.0 / expectedNumTopLevelWorkUnits); + Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeInfo` param + Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeInfo` param + + // expected sizes for (n=5) top-level non-multi-WUs: (1x) 0, (1x) 100, (1x) 200, (1x) 300, (1x) 400 + // expected sizes for (n=15) top-level multi-WUs: [a] (4x) 70; [b] (4x) 210 (= 70+140); [c] (4x) 420 (= 70+140+210); [d] (3x) 700 (= 70+140+210+280) + Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().toArray(), + new Double[]{ + 70.0, 70.0, 70.0, 70.0, // 4x MWU [a] + 100.0, 200.0, // non-MWU [2, 3] + 210.0, 210.0, 210.0, 210.0, // 4x MWU [b] + 300.0, 400.0, // non-MWU [4, 5] + 420.0, 420.0, 420.0, 420.0, // 4x MWU [c] + 700.0, 700.0, 700.0, 700.0 }); // 3x MWU [d] + "100-percentile" (all WUs) + + // expected sizes for (n=36) constituents from multi-WUs: [m] (15x = 4+4+4+3) 70; [n] (11x = 4+4+3) 140; [o] (7x = 4+3) 210; [p] (3x) 280 + Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().toArray(), + new Double[]{ + 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, 70.0, // (per 15x MWU [m]) - 15/41 * 20 ~ 7.3 + 100.0, // non-MWU [2] + 140.0, 140.0, 140.0, 140.0, 140.0, // (per 11x MWU [n]) - (15+1+11/41) * 20 ~ 13.2 | 13.2 - 8 = 5.2 + 200.0, // non-MWU [3] + 210.0, 210.0, 210.0, // (per 7x MWU [o]) - (15+1+11+1+7/41) * 20 ~ 17.0 | 17.0 - 14 = 3 + 280.0, 280.0, // 3x MWU [p] ... (15+1+11+1+7+3/41) * 20 ~ 18.5 | 18.5 - 17 = 2 + 400.0 }); // with only one 20-quantile remaining, non-MWU [5] completes the "100-percentile" (all WUs) + } + + public static WorkUnit createWorkUnitOfSize(long size) { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size); + return workUnit; + } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java new file mode 100644 index 00000000000..6df60857eba --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/work/EagerFsDirBackedWorkUnitClaimCheckWorkloadTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.work; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.mockito.Mockito; +import org.mockito.MockedStatic; + +import org.apache.gobblin.temporal.util.nesting.work.Workload; +import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.WorkUnitSizeInfo; + + +/** Tests for {@link EagerFsDirBackedWorkUnitClaimCheckWorkload} */ +public class EagerFsDirBackedWorkUnitClaimCheckWorkloadTest { + + private EagerFsDirBackedWorkUnitClaimCheckWorkload workload; + private EventSubmitterContext eventSubmitterContext; + private FileSystem mockFileSystem; + + @BeforeMethod + public void setUp() throws Exception { + URI fileSystemUri = new URI("hdfs://localhost:9000"); + String hdfsDir = "/test/dir"; + eventSubmitterContext = Mockito.mock(EventSubmitterContext.class); + workload = Mockito.spy(new EagerFsDirBackedWorkUnitClaimCheckWorkload(fileSystemUri, hdfsDir, eventSubmitterContext)); + mockFileSystem = Mockito.mock(FileSystem.class); + + MockedStatic mockedHadoopUtils = Mockito.mockStatic(HadoopUtils.class); + mockedHadoopUtils.when(() -> HadoopUtils.getFileSystem(Mockito.any(), Mockito.any())).thenReturn(mockFileSystem); + } + + @Test + public void testExtractWorkUnitSizeInfo() throws Exception { + // mock `FileStatus` having an encoded WorkUnitSizeInfo as filename + List fileStatuses = Arrays.asList( + createMockFileStatus("/test/dir/multitask_n=3-total=100-median=100.0-mean=100.0-stddev=0.0_0.mwu"), + createMockFileStatus("/test/dir/multitask_n=2-total=200-median=100.0-mean=100.0-stddev=0.0_1.mwu"), + createMockFileStatus("/test/dir/task_n=1-total=300-median=100.0-mean=100.0-stddev=0.0_5.wu"), + createMockFileStatus("/test/dir/multitask_n=4-total=400-median=100.0-mean=100.0-stddev=0.0_30.mwu"), + createMockFileStatus("/test/dir/task_n=1-total=500-median=100.0-mean=100.0-stddev=0.0_2.wu") + ); + + Mockito.when(mockFileSystem.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))) + .thenReturn(fileStatuses.toArray(new FileStatus[0])); + + Optional> span = workload.getSpan(0, 5); + Assert.assertTrue(span.isPresent()); + + Workload.WorkSpan workSpan = span.get(); + Assert.assertEquals(workSpan.getNumElems(), fileStatuses.size()); + + Iterable workSpanIterable = () -> workSpan; + List wuClaimChecks = StreamSupport.stream(workSpanIterable.spliterator(), false) + .collect(Collectors.toList()); + + // note: since ordering is based on filename, it will not be the same as `fileStatuses` above + for (WorkUnitClaimCheck workUnitClaimCheck : wuClaimChecks) { + WorkUnitSizeInfo sizeInfo = workUnitClaimCheck.getWorkUnitSizeInfo(); + Assert.assertNotNull(sizeInfo); + int n = sizeInfo.getNumConstituents(); + long expectedTotalSize = (n == 1 && workUnitClaimCheck.getWorkUnitPath().endsWith("_5.wu")) ? 300 : + n == 1 ? 500 : + n == 2 ? 200 : + n == 3 ? 100 : + n == 4 ? 400 : + -1; // unexpected (sentinel) + Assert.assertEquals(sizeInfo.getTotalSize(), expectedTotalSize); + } + } + + private static FileStatus createMockFileStatus(String path) { + FileStatus fileStatus = Mockito.mock(FileStatus.class); + Path filePath = new Path(path); + Mockito.when(fileStatus.getPath()).thenReturn(filePath); + return fileStatus; + } +} + diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle index b916f214cdc..80e73afcfdc 100644 --- a/gobblin-utility/build.gradle +++ b/gobblin-utility/build.gradle @@ -40,6 +40,7 @@ dependencies { compile externalDependency.metricsCore compile externalDependency.guavaretrying compile externalDependency.guice + compile externalDependency.tdigest compile externalDependency.typesafeConfig compile externalDependency.commonsPool compile externalDependency.hadoopClientCommon diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java index f70f11f752e..d1f97681eec 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; +import lombok.extern.slf4j.Slf4j; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.Cache; @@ -40,8 +42,6 @@ import com.google.common.collect.Lists; import com.google.common.io.Closer; -import lombok.extern.slf4j.Slf4j; - import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.source.workunit.MultiWorkUnit; @@ -66,13 +66,26 @@ public class JobLauncherUtils { public static class WorkUnitPathCalculator { private final AtomicInteger nextMultiWorkUnitTaskId = new AtomicInteger(0); - // Serialize each work unit into a file named after the task ID + /** @return `Path` beneath `basePath` to serialize `workUnit`, with file named after the task ID (itself named after the job ID) */ public Path calcNextPath(WorkUnit workUnit, String jobId, Path basePath) { String workUnitFileName = workUnit.isMultiWorkUnit() ? JobLauncherUtils.newMultiTaskId(jobId, nextMultiWorkUnitTaskId.getAndIncrement()) + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION : workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION; return new Path(basePath, workUnitFileName); } + + /** + * Calc where to serialize {@link WorkUnit}, using a filename that tunnels {@link WorkUnitSizeInfo} (vs. repeating the task/job ID, as was legacy practice) + * This provides a direct and simple way to tunnel limited, but crucial, metadata that incurs no additional access cost (nor adds FS load). + * @return `Path` beneath `basePath` to serialize `workUnit` + */ + public Path calcNextPathWithTunneledSizeInfo(WorkUnit workUnit, String jobId, Path basePath) { + String encodedSizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit).encode(); + String workUnitFileName = workUnit.isMultiWorkUnit() + ? Id.MultiTask.create(encodedSizeInfo, nextMultiWorkUnitTaskId.getAndIncrement()) + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION + : Id.Task.create(encodedSizeInfo, workUnit.getPropAsInt(ConfigurationKeys.TASK_KEY_KEY)) + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION; + return new Path(basePath, workUnitFileName); + } } /** diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java new file mode 100644 index 00000000000..107590407a5 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.tdunning.math.stats.TDigest; + +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.source.workunit.MultiWorkUnit; +import org.apache.gobblin.source.workunit.WorkUnit; + + +/** + * Bare-bones size information about a {@link WorkUnit}, possibly a {@link MultiWorkUnit}, where a constituent work unit is one with no children - a leaf. + * + * Measurement currently requires the `WorkUnit` to define {@link ServiceConfigKeys#WORK_UNIT_SIZE}, otherwise sizes will be 0 with merely the count of + * constituent `WorkUnits`. For the most part, at present, that key is supplied only by {@link org.apache.gobblin.data.management.copy.CopySource}. + * Nonetheless, the "contract" for any {@link org.apache.gobblin.source.Source} is both clear and reasonable: just add "size" to your `WorkUnit`s to + * participate. + * + * Some sources might count bytes, others num records, possibly with those size-weighted; and of course not all sources extract a definite + * amount of data, known up front. In such cases, the {@link #numConstituents} (aka. parallelism potential) may be most informative. + */ +@Data +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class WorkUnitSizeInfo { + // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite - "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor" + @NonNull private int numConstituents; + @NonNull private long totalSize; + @NonNull private double medianSize; + @NonNull private double meanSize; + @NonNull private double stddevSize; + + /** @return the 'zero' {@link WorkUnitSizeInfo} */ + public static WorkUnitSizeInfo empty() { + return new WorkUnitSizeInfo(0, 0, 0.0, 0.0, 0.0); + } + + /** + * convenience factory to measure a {@link WorkUnit} - preferable to direct ctor call + * @returns {@link #empty()} when the `WorkUnit` is not measurable by defining {@link ServiceConfigKeys#WORK_UNIT_SIZE} + */ + public static WorkUnitSizeInfo forWorkUnit(WorkUnit workUnit) { + // NOTE: redundant `instanceof` merely to appease FindBugs - "Unchecked/unconfirmed cast ..." + if (!workUnit.isMultiWorkUnit() || !(workUnit instanceof MultiWorkUnit)) { + long wuSize = workUnit.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0); + return new WorkUnitSizeInfo(1, wuSize, wuSize, wuSize, 0.0); + } else { + // WARNING/TODO: NOT resilient to nested multi-workunits... should it be? + List subWorkUnitsList = ((MultiWorkUnit) workUnit).getWorkUnits(); + if (subWorkUnitsList.isEmpty()) { + return WorkUnitSizeInfo.empty(); + } + int n = subWorkUnitsList.size(); + TDigest constituentWorkUnitsDigest = TDigest.createDigest(100); + AtomicLong totalSize = new AtomicLong(0L); + List subWorkUnitSizes = subWorkUnitsList.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)) + .boxed().collect(Collectors.toList()); + subWorkUnitSizes.forEach(wuSize -> { + constituentWorkUnitsDigest.add(wuSize); + totalSize.addAndGet(wuSize); + }); + double mean = totalSize.get() * 1.0 / n; + double variance = subWorkUnitSizes.stream().mapToDouble(wuSize -> { + double meanDiff = wuSize - mean; + return meanDiff * meanDiff; + }).sum() / n; + double stddev = Math.sqrt(variance); + double median = constituentWorkUnitsDigest.quantile(0.5); + return new WorkUnitSizeInfo(n, totalSize.get(), median, mean, stddev); + } + } + + /** + * @return stringified, human-readable prop+value encoding that may be inverted with {@link #decode(String)} + * + * NOTE: The resulting encoded form will be between 42 and 117 chars: + * - presuming - a 32-bit int (max 10 digits), a 64-bit long (max 19 digits), and a 64-bit double (max 19 digits) + * * 86 digits maximum for the values - 1*int + 1*long + 3*double = 10 + 19 + 3*19 = 80 + * * 11 digits minimum for the values - 1*int + 1*long + 3*double = 1 + 1 + 3*3 = 11 + * * 22 digits for the names [1+5+6+4+6] + * * 9 digits for additional syntax [5+4] + * = 117 digits (max) + * = 42 digits (min) + */ + @JsonIgnore // (because no-arg method resembles 'java bean property') + public String encode() { + return String.format("n=%d-total=%d-median=%.2f-mean=%.2f-stddev=%.2f", + numConstituents, totalSize, medianSize, meanSize, stddevSize); + } + + private static final String DECODING_REGEX = "^n=(\\d+)-total=(\\d+)-median=(\\d+(?:\\.\\d+)?)-mean=(\\d+(?:\\.\\d+)?)-stddev=(\\d+(?:\\.\\d+)?)$"; + private static final Pattern decodingPattern = Pattern.compile(DECODING_REGEX); + + /** @return the parsed size info, when `encoded` is in {@link WorkUnitSizeInfo#encode()}-compatible form; otherwise {@link Optional#empty()} */ + public static Optional decode(String encoded) { + Matcher decoding = decodingPattern.matcher(encoded); + if (!decoding.matches()) { + return Optional.empty(); + } else { + return Optional.of(new WorkUnitSizeInfo( + Integer.parseInt(decoding.group(1)), + Long.parseLong(decoding.group(2)), + Double.parseDouble(decoding.group(3)), + Double.parseDouble(decoding.group(4)), + Double.parseDouble(decoding.group(5)) + )); + } + } +} diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java index c11fcc460c4..9bdae9ffea4 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/JobLauncherUtilsTest.java @@ -26,12 +26,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.LoggerFactory; + import org.testng.Assert; import org.testng.annotations.Test; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.Extract.TableType; import org.apache.gobblin.source.workunit.MultiWorkUnit; @@ -48,6 +50,8 @@ public class JobLauncherUtilsTest { private static final String JOB_NAME = "foo"; private static final Pattern PATTERN = Pattern.compile("job_" + JOB_NAME + "_\\d+"); + private final Path wuBasePath = new Path("/abs/base/path"); + private final String wuPathJobId = Id.Task.create("test_wu_path_JobId", 6).toString(); private String jobId; @Test @@ -87,6 +91,92 @@ public void testFlattenWorkUnits() { Assert.assertEquals(JobLauncherUtils.flattenWorkUnits(workUnitsAndMultiWorkUnits).size(), 9); } + @Test + public void testCalcNextPathSingleWorkUnit() { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task1"); + + JobLauncherUtils.WorkUnitPathCalculator pathCalc = new JobLauncherUtils.WorkUnitPathCalculator(); + Path resultPath = pathCalc.calcNextPath(workUnit, wuPathJobId, wuBasePath); + + Assert.assertEquals(resultPath, new Path(wuBasePath, "task1" + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION)); + } + + @Test + public void testCalcNextPathWithTunneledSizeInfoSingleWorkUnit() { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, "task1"); + workUnit.setProp(ConfigurationKeys.TASK_KEY_KEY, "789"); + workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, "120"); + + JobLauncherUtils.WorkUnitPathCalculator pathCalc = new JobLauncherUtils.WorkUnitPathCalculator(); + Path resultPath = pathCalc.calcNextPathWithTunneledSizeInfo(workUnit, jobId, wuBasePath); + + String encodedSizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit).encode(); + String expectedFileName = Id.Task.PREFIX + "_" + encodedSizeInfo + "_789" + JobLauncherUtils.WORK_UNIT_FILE_EXTENSION; + Assert.assertEquals(resultPath, new Path(wuBasePath, expectedFileName)); + } + + @Test + public void testCalcNextPathMultiWorkUnit() { + MultiWorkUnit multiWorkUnitA = MultiWorkUnit.createEmpty(); + for (int i = 0; i < 5; i++) { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_A" + i); + multiWorkUnitA.addWorkUnit(workUnit); + } + MultiWorkUnit multiWorkUnitB = MultiWorkUnit.createEmpty(); + for (int i = 0; i < 3; i++) { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_B" + i); + multiWorkUnitB.addWorkUnit(workUnit); + } + + JobLauncherUtils.WorkUnitPathCalculator pathCalc = new JobLauncherUtils.WorkUnitPathCalculator(); + Path resultPathA0 = pathCalc.calcNextPath(multiWorkUnitA, wuPathJobId, wuBasePath); + Path resultPathB1 = pathCalc.calcNextPath(multiWorkUnitB, wuPathJobId, wuBasePath); + Path resultPathB2 = pathCalc.calcNextPath(multiWorkUnitB, wuPathJobId, wuBasePath); + + String expectedFileNameA0 = Id.MultiTask.PREFIX + "_" + wuPathJobId.replace("task_", "") + "_0" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; + String expectedFileNameB1 = Id.MultiTask.PREFIX + "_" + wuPathJobId.replace("task_", "") + "_1" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; + String expectedFileNameB2 = Id.MultiTask.PREFIX + "_" + wuPathJobId.replace("task_", "") + "_2" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; + Assert.assertEquals(resultPathA0, new Path(wuBasePath, expectedFileNameA0)); + Assert.assertEquals(resultPathB1, new Path(wuBasePath, expectedFileNameB1)); + Assert.assertEquals(resultPathB2, new Path(wuBasePath, expectedFileNameB2)); + } + + @Test + public void testCalcNextPathWithTunneledSizeInfoMultiWorkUnit() { + MultiWorkUnit multiWorkUnitA = MultiWorkUnit.createEmpty(); + for (int i = 0; i < 5; i++) { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_A" + i); + workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, String.valueOf((i + 1) * 75)); + multiWorkUnitA.addWorkUnit(workUnit); + } + MultiWorkUnit multiWorkUnitB = MultiWorkUnit.createEmpty(); + for (int i = 0; i < 3; i++) { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ConfigurationKeys.TASK_ID_KEY, Id.Task.PREFIX + "_B" + i); + workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, String.valueOf((i + 1) * 66)); + multiWorkUnitB.addWorkUnit(workUnit); + } + + JobLauncherUtils.WorkUnitPathCalculator pathCalc = new JobLauncherUtils.WorkUnitPathCalculator(); + Path resultPathA0 = pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitA, wuPathJobId, wuBasePath); + Path resultPathB1 = pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitB, wuPathJobId, wuBasePath); + Path resultPathB2 = pathCalc.calcNextPathWithTunneledSizeInfo(multiWorkUnitB, wuPathJobId, wuBasePath); + + String encodedSizeInfoA = WorkUnitSizeInfo.forWorkUnit(multiWorkUnitA).encode(); + String encodedSizeInfoB = WorkUnitSizeInfo.forWorkUnit(multiWorkUnitB).encode(); + String expectedFileNameA0 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoA + "_0" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; + String expectedFileNameB1 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoB + "_1" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; + String expectedFileNameB2 = Id.MultiTask.PREFIX + "_" + encodedSizeInfoB + "_2" + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION; + Assert.assertEquals(resultPathA0, new Path(wuBasePath, expectedFileNameA0)); + Assert.assertEquals(resultPathB1, new Path(wuBasePath, expectedFileNameB1)); + Assert.assertEquals(resultPathB2, new Path(wuBasePath, expectedFileNameB2)); + } + @Test public void testDeleteStagingData() throws IOException { FileSystem fs = FileSystem.getLocal(new Configuration()); diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java new file mode 100644 index 00000000000..ab2b689ea88 --- /dev/null +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/WorkUnitSizeInfoTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util; + +import java.util.ArrayList; +import java.util.List; + +import java.util.Optional; +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.source.workunit.MultiWorkUnit; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.service.ServiceConfigKeys; + + +/** Tests for {@link WorkUnitSizeInfo}. */ +public class WorkUnitSizeInfoTest { + + @Test + public void testMultiWorkUnitSizeInfo() { + List workUnits = new ArrayList<>(); + long totalSize = 0; + for (int i = 1; i <= 20; i++) { + long size = i * 100L; + totalSize += size; + WorkUnit wu = WorkUnit.createEmpty(); + wu.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size); + workUnits.add(wu); + } + MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty(); + multiWorkUnit.addWorkUnits(workUnits); + + double expectedMean = totalSize * 1.0 / workUnits.size(); + + WorkUnitSizeInfo sizeInfo = WorkUnitSizeInfo.forWorkUnit(multiWorkUnit); + + Assert.assertEquals(sizeInfo.getNumConstituents(), 20); + Assert.assertEquals(sizeInfo.getTotalSize(), totalSize); + Assert.assertEquals(sizeInfo.getMedianSize(), 1100.0, 0.1); + Assert.assertEquals(sizeInfo.getMeanSize(), expectedMean, 0.1); + Assert.assertEquals(sizeInfo.getStddevSize(), 576.628, 0.1); + } + + @Test + public void testSingleWorkUnitSizeInfo() { + WorkUnit workUnit = WorkUnit.createEmpty(); + workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, 5432L); + + WorkUnitSizeInfo sizeInfo = WorkUnitSizeInfo.forWorkUnit(workUnit); + + Assert.assertEquals(sizeInfo.getNumConstituents(), 1); + Assert.assertEquals(sizeInfo.getTotalSize(), 5432L); + Assert.assertEquals(sizeInfo.getMedianSize(), 5432.0, 0.1); + Assert.assertEquals(sizeInfo.getMeanSize(), 5432.0, 0.1); + Assert.assertEquals(sizeInfo.getStddevSize(), 0.0, 0.1); + } + + @Test + public void testEncodeThenDecodeRoundTrip() { + WorkUnitSizeInfo original = new WorkUnitSizeInfo(20, 21000L, 1100.0, 1050.0, 576.628); + String encoded = original.encode(); + + Assert.assertEquals(encoded, "n=20-total=21000-median=1100.00-mean=1050.00-stddev=576.63"); + + Optional optDecoded = WorkUnitSizeInfo.decode(encoded); + Assert.assertTrue(optDecoded.isPresent()); + WorkUnitSizeInfo decoded = optDecoded.get(); + Assert.assertEquals(decoded.getNumConstituents(), original.getNumConstituents()); + Assert.assertEquals(decoded.getTotalSize(), original.getTotalSize()); + Assert.assertEquals(decoded.getMedianSize(), original.getMedianSize(), 0.1); + Assert.assertEquals(decoded.getMeanSize(), original.getMeanSize(), 0.1); + Assert.assertEquals(decoded.getStddevSize(), original.getStddevSize(), 0.1); + } + + @Test + public void testDecodeFromIntFormattedDoubles() { + String encoded = "n=20-total=12345-median=1111-mean=617-stddev=543"; + + Optional optDecoded = WorkUnitSizeInfo.decode(encoded); + Assert.assertTrue(optDecoded.isPresent()); + WorkUnitSizeInfo decoded = optDecoded.get(); + Assert.assertEquals(decoded.getNumConstituents(), 20); + Assert.assertEquals(decoded.getTotalSize(), 12345L); + Assert.assertEquals(decoded.getMedianSize(), 1111.0); + Assert.assertEquals(decoded.getMeanSize(), 617.0); + Assert.assertEquals(decoded.getStddevSize(), 543.0); + } +} diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index 966ef824d7c..e4b328c5e14 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -220,6 +220,7 @@ ext.externalDependency = [ "org.slf4j:slf4j-log4j12:" + slf4jVersion ], "postgresConnector": "org.postgresql:postgresql:42.1.4", + "tdigest": "com.tdunning:t-digest:3.3", "testContainers": "org.testcontainers:testcontainers:1.17.3", "testContainersMysql": "org.testcontainers:mysql:1.17.3", "xz": "org.tukaani:xz:1.8"