Skip to content

Commit

Permalink
fix updating workforce plan while containerCompletion
Browse files Browse the repository at this point in the history
  • Loading branch information
Blazer-007 committed Jan 14, 2025
1 parent e8f7574 commit 089e2e0
Showing 1 changed file with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.gobblin.temporal.yarn;

import java.util.Collections;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -60,6 +60,7 @@ public class DynamicScalingYarnService extends YarnService {
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;
private final Set<ContainerId> removedContainerIds;
public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile";
private final AtomicLong profileNameSuffixGenerator;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
Expand Down Expand Up @@ -164,6 +165,9 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (currNumContainers > 0) {
this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1,
System.currentTimeMillis());
// Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile,
// otherwise extra containers will be requested when calculating deltas
scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()));
}
iterator.remove();
}
Expand Down Expand Up @@ -214,11 +218,16 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont
log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container",
completedContainerId, completedContainerInfo.getWorkerProfileName());

List<ScalingDirective> scalingDirectives = new ArrayList<>();

WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
// Update the current staffing to reflect the container that exited with OOM
int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
if (currNumContainers > 0) {
this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis());
// Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile,
// otherwise extra containers will be requested when calculating deltas
scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()));
}

// Request a replacement container
Expand All @@ -227,13 +236,13 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont
Optional<ProfileDerivation> optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(),
new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + ""))
));
ScalingDirective scalingDirective = new ScalingDirective(
workerProfile.getName() + "-" + profileNameSuffixGenerator.getAndIncrement(),
scalingDirectives.add(new ScalingDirective(
DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(),
1,
System.currentTimeMillis(),
optProfileDerivation
);
reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(scalingDirective));
));
reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
}

}

0 comments on commit 089e2e0

Please sign in to comment.