diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index fa991b39f7..45c36b86b1 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -17,7 +17,7 @@ package org.apache.gobblin.temporal.yarn; -import java.util.Collections; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -60,6 +60,7 @@ public class DynamicScalingYarnService extends YarnService { /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; private final Set removedContainerIds; + public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -164,6 +165,9 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List 0) { this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); } iterator.remove(); } @@ -214,11 +218,16 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + List scalingDirectives = new ArrayList<>(); + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); // Update the current staffing to reflect the container that exited with OOM int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); if (currNumContainers > 0) { this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); } // Request a replacement container @@ -227,13 +236,13 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) )); - ScalingDirective scalingDirective = new ScalingDirective( - workerProfile.getName() + "-" + profileNameSuffixGenerator.getAndIncrement(), + scalingDirectives.add(new ScalingDirective( + DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(), 1, System.currentTimeMillis(), optProfileDerivation - ); - reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(scalingDirective)); + )); + reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); } }