Skip to content

Commit

Permalink
initial dynamicscalingyarnservice changes for handleContainerCompletion
Browse files Browse the repository at this point in the history
  • Loading branch information
Blazer-007 committed Jan 13, 2025
1 parent a0cef28 commit e8f7574
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@

package org.apache.gobblin.temporal.yarn;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
import org.apache.gobblin.temporal.dynamic.WorkerProfile;
Expand All @@ -47,13 +59,17 @@ public class DynamicScalingYarnService extends YarnService {
private final WorkforceStaffing actualWorkforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;
private final Set<ContainerId> removedContainerIds;
private final AtomicLong profileNameSuffixGenerator;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);

this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
this.removedContainerIds = ConcurrentHashMap.newKeySet();
this.profileNameSuffixGenerator = new AtomicLong();
}

@Override
Expand All @@ -62,6 +78,71 @@ protected synchronized void requestInitialContainers() {
requestNewContainersForStaffingDeltas(deltas);
}

/**
* Handle the completion of a container. A new container will be requested to replace the one
* that just exited depending on the exit status.
* <p>
* A container completes in either of the following conditions:
* <ol>
* <li> The container gets stopped by the ApplicationMaster. </li>
* <li> Some error happens in the container and caused the container to exit </li>
* <li> The container gets preempted by the ResourceManager </li>
* <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li>
* </ol>
* A replacement container is needed in all except the first case.
* </p>
*/
@Override
protected void handleContainerCompletion(ContainerStatus containerStatus) {
ContainerId completedContainerId = containerStatus.getContainerId();
ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId);

if (completedContainerInfo == null) {
log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()",
completedContainerId);
this.removedContainerIds.add(completedContainerId);
return;
}

log.info("Container {} running profile {} completed with exit status {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()
);

if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
log.info("Container {} running profile {} completed with diagnostics: {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()
);
}

if (this.shutdownInProgress) {
log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId);
return;
}

WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();

switch (containerStatus.getExitStatus()) {
case(ContainerExitStatus.ABORTED):
handleAbortedContainer(completedContainerId, completedContainerInfo);
break;
case(ContainerExitStatus.PREEMPTED):
log.info("Container {} for profile {} preempted, starting to launching a replacement container",
completedContainerId, completedContainerInfo.getWorkerProfileName());
requestContainersForWorkerProfile(workerProfile, 1);
break;
case(137): // General OOM exit status
case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
handleContainerExitedWithOOM(completedContainerId, completedContainerInfo);
break;
case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed
log.info("Exit status 1.CompletedContainerInfo = {}", completedContainerInfo);
break;
default:
break;
}
}

/**
* Revises the workforce plan and requests new containers based on the given scaling directives.
*
Expand All @@ -71,29 +152,88 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}

// Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated()
Iterator<ContainerId> iterator = removedContainerIds.iterator();
while (iterator.hasNext()) {
ContainerId containerId = iterator.next();
ContainerInfo containerInfo = this.containerMap.remove(containerId);
if (containerInfo != null) {
WorkerProfile workerProfile = containerInfo.getWorkerProfile();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
if (currNumContainers > 0) {
this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1,
System.currentTimeMillis());
}
iterator.remove();
}
}

this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}

private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
if (profileDelta.getDelta() > 0) { // scale up!
WorkerProfile workerProfile = profileDelta.getProfile();
String profileName = workerProfile.getName();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
int delta = profileDelta.getDelta();
WorkerProfile workerProfile = profileDelta.getProfile();
String profileName = workerProfile.getName();
int delta = profileDelta.getDelta();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
if (delta > 0) { // scale up!
log.info("Requesting {} new containers for profile {} having currently {} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis());
} else if (profileDelta.getDelta() < 0) { // scale down!
// TODO: Decide how to handle negative deltas
log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ",
profileDelta.getProfile().getName(), profileDelta.getDelta());
} else if (delta < 0) { // scale down!
log.info("Releasing {} containers for profile {} having currently {} containers", -delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
releaseContainersForWorkerProfile(profileName, delta);
// update our staffing after releasing containers
int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis());
} // else, already at staffing plan (or at least have requested, so in-progress)
});
}

private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) {
// Case 1 : Container release requested while scaling down
if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) {
log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName());
this.releasedContainerCache.invalidate(completedContainerId);
return;
}

// Case 2 : Container release was not requested, we need to request a replacement container
log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName());
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
}

private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) {
log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container",
completedContainerId, completedContainerInfo.getWorkerProfileName());

WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
// Update the current staffing to reflect the container that exited with OOM
int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
if (currNumContainers > 0) {
this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis());
}

// Request a replacement container
int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable
Optional<ProfileDerivation> optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(),
new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + ""))
));
ScalingDirective scalingDirective = new ScalingDirective(
workerProfile.getName() + "-" + profileNameSuffixGenerator.getAndIncrement(),
1,
System.currentTimeMillis(),
optProfileDerivation
);
reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(scalingDirective));
}

}
Loading

0 comments on commit e8f7574

Please sign in to comment.