From dee3fe4c4e533eb9eecb0d00918bc9375ee9cd91 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Thu, 20 Feb 2025 12:30:55 +0530 Subject: [PATCH] added a comment --- .../gobblin/temporal/yarn/DynamicScalingYarnService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 1ba66d674f..0010a45ff8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -59,6 +59,7 @@ public class DynamicScalingYarnService extends YarnService { protected static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; protected static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB + private static final int EPSILON_MIILIS = 1; /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; @@ -235,10 +236,10 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont // Update the current staffing to reflect the container that exited with OOM int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); if (currNumContainers > 0) { - this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 1); + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis); // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, // otherwise extra containers will be requested when calculating deltas - scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 2)); + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis)); } // Request a replacement container @@ -256,7 +257,7 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont scalingDirectives.add(new ScalingDirective( DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(), 1, - currTimeMillis + 3, + currTimeMillis + EPSILON_MIILIS, // Each scaling directive should have a newer timestamp than the previous one optProfileDerivation )); reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);