From 87c8ab4a4297c213058fabfb1752f33a2aa47c39 Mon Sep 17 00:00:00 2001 From: Vivek Rai <43493515+Blazer-007@users.noreply.github.com> Date: Mon, 6 Jan 2025 19:08:05 +0530 Subject: [PATCH] [GOBBLIN-2187] Prevent NaN while generating `WorkUnitsSizeSummary.Distillation` (#4090) --- .../activity/impl/GenerateWorkUnitsImpl.java | 6 +++- .../ddm/work/WorkUnitsSizeSummary.java | 4 +-- .../impl/GenerateWorkUnitsImplTest.java | 29 +++++++++++++++++-- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java index e0fa2ebb5e7..63d9b6b1ea3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java @@ -95,7 +95,11 @@ public WorkUnitsSizeSummary asSizeSummary(int numQuantiles) { private static List getQuantiles(TDigest digest, int numQuantiles) { List quantileMinSizes = Lists.newArrayList(); for (int i = 1; i <= numQuantiles; i++) { - quantileMinSizes.add(digest.quantile((i * 1.0) / numQuantiles)); + double currQuantileMinSize = digest.quantile((i * 1.0) / numQuantiles); + if (Double.isNaN(currQuantileMinSize)) { + currQuantileMinSize = 0.0; + } + quantileMinSizes.add(currQuantileMinSize); } return quantileMinSizes; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java index 16a20516043..64172ccd6e9 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java @@ -78,12 +78,12 @@ public Distillation distill() { @JsonIgnore // (because no-arg method resembles 'java bean property') public double getTopLevelWorkUnitsMeanSize() { - return this.totalSize * 1.0 / this.topLevelWorkUnitsCount; + return this.topLevelWorkUnitsCount == 0 ? 0.0 : (this.totalSize * 1.0 / this.topLevelWorkUnitsCount); } @JsonIgnore // (because no-arg method resembles 'java bean property') public double getConstituentWorkUnitsMeanSize() { - return this.totalSize * 1.0 / this.constituentWorkUnitsCount; + return this.constituentWorkUnitsCount == 0 ? 0.0 : (this.totalSize * 1.0 / this.constituentWorkUnitsCount); } @JsonIgnore // (because no-arg method resembles 'java bean property') diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java index 8c94783a7dd..a7790c423db 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java @@ -148,8 +148,8 @@ public void testDigestWorkUnitsSize() { Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsCount(), expectedNumConstituentWorkUnits); Assert.assertEquals(wuSizeInfo.getQuantilesCount(), numQuantilesDesired); Assert.assertEquals(wuSizeInfo.getQuantilesWidth(), 1.0 / expectedNumTopLevelWorkUnits); - Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeInfo` param - Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeInfo` param + Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeSummary` param + Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeSummary` param // expected sizes for (n=5) top-level non-multi-WUs: (1x) 0, (1x) 100, (1x) 200, (1x) 300, (1x) 400 // expected sizes for (n=15) top-level multi-WUs: [a] (4x) 70; [b] (4x) 210 (= 70+140); [c] (4x) 420 (= 70+140+210); [d] (3x) 700 (= 70+140+210+280) @@ -174,6 +174,31 @@ public void testDigestWorkUnitsSize() { 400.0 }); // with only one 20-quantile remaining, non-MWU [5] completes the "100-percentile" (all WUs) } + @Test + public void testDigestWorkUnitsSizeWithEmptyWorkUnits() { + List workUnits = new ArrayList<>(); + GenerateWorkUnitsImpl.WorkUnitsSizeDigest wuSizeDigest = GenerateWorkUnitsImpl.digestWorkUnitsSize(workUnits); + + Assert.assertEquals(wuSizeDigest.getTotalSize(), 0L); + Assert.assertEquals(wuSizeDigest.getTopLevelWorkUnitsSizeDigest().size(), 0); + Assert.assertEquals(wuSizeDigest.getConstituentWorkUnitsSizeDigest().size(), 0); + + int numQuantilesDesired = 10; + WorkUnitsSizeSummary wuSizeInfo = wuSizeDigest.asSizeSummary(numQuantilesDesired); + Assert.assertEquals(wuSizeInfo.getTotalSize(), 0L); + Assert.assertEquals(wuSizeInfo.getTopLevelWorkUnitsCount(), 0); + Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsCount(), 0); + Assert.assertEquals(wuSizeInfo.getQuantilesCount(), numQuantilesDesired); + Assert.assertEquals(wuSizeInfo.getQuantilesWidth(), 1.0 / numQuantilesDesired); + Assert.assertEquals(wuSizeInfo.getTopLevelQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeSummary` param + Assert.assertEquals(wuSizeInfo.getConstituentQuantilesMinSizes().size(), numQuantilesDesired); // same as `asSizeSummary` param + Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsMeanSize(), 0.0); + Assert.assertEquals(wuSizeInfo.getTopLevelWorkUnitsMeanSize(), 0.0); + Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsMeanSize(), 0.0); + Assert.assertEquals(wuSizeInfo.getTopLevelWorkUnitsMedianSize(), 0.0); + Assert.assertEquals(wuSizeInfo.getConstituentWorkUnitsMedianSize(), 0.0); + } + public static WorkUnit createWorkUnitOfSize(long size) { WorkUnit workUnit = WorkUnit.createEmpty(); workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, size);