Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Blazer-007 committed Jan 31, 2025
1 parent e69a77e commit 7a80eb7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@
*/
@Slf4j
public class DynamicScalingYarnService extends YarnService {
public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile";
public static final int GENERAL_OOM_EXIT_STATUS_CODE = 137;
public static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2;
public static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB

/** this holds the current count of containers already requested for each worker profile */
private final WorkforceStaffing actualWorkforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;
private final Set<ContainerId> removedContainerIds;
public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile";
private final AtomicLong profileNameSuffixGenerator;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
Expand Down Expand Up @@ -98,6 +101,10 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
ContainerId completedContainerId = containerStatus.getContainerId();
ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId);

// Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion()
// is called before onContainersAllocated(), resulting in the containerId missing from the containersMap.
// We use removedContainerIds to remember these containers and remove them from containerMap later
// when we call reviseWorkforcePlanAndRequestNewContainers method
if (completedContainerInfo == null) {
log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()",
completedContainerId);
Expand All @@ -106,13 +113,11 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
}

log.info("Container {} running profile {} completed with exit status {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()
);
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus());

if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
log.info("Container {} running profile {} completed with diagnostics: {}",
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()
);
completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics());
}

if (this.shutdownInProgress) {
Expand All @@ -131,7 +136,7 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
completedContainerId, completedContainerInfo.getWorkerProfileName());
requestContainersForWorkerProfile(workerProfile, 1);
break;
case(137): // General OOM exit status
case(GENERAL_OOM_EXIT_STATUS_CODE):
case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
handleContainerExitedWithOOM(completedContainerId, completedContainerInfo);
Expand Down Expand Up @@ -232,7 +237,12 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont

// Request a replacement container
int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable
int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER;
if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.",
MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
return;
}
Optional<ProfileDerivation> optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(),
new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + ""))
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class YarnService extends AbstractIdleService {
private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
private final NMClientAsync nmClientAsync;
private final ExecutorService containerLaunchExecutor;
private final boolean containerHostAffinityEnabled;
private final String containerTimezone;
private final String proxyJvmArgs;

Expand Down Expand Up @@ -197,8 +196,6 @@ public YarnService(Config config, String applicationName, String applicationId,
this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
this.nmClientAsync.init(this.yarnConfiguration);

this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);

this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY;

Expand Down Expand Up @@ -536,26 +533,6 @@ private String buildContainerCommand(Container container, String workerProfileNa
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
}

/**
* Check the exit status of a completed container and see if the replacement container
* should try to be started on the same node. Some exit status indicates a disk or
* node failure and in such cases the replacement container should try to be started on
* a different node.
*/
private boolean shouldStickToTheSameNode(int containerExitStatus) {
switch (containerExitStatus) {
case ContainerExitStatus.DISKS_FAILED:
return false;
case ContainerExitStatus.ABORTED:
// Mostly likely this exit status is due to node failures because the
// application itself will not release containers.
return false;
default:
// Stick to the same node for other cases if host affinity is enabled.
return this.containerHostAffinityEnabled;
}
}

/**
* Handle the completion of a container.
* Just removes the containerId from {@link #containerMap}
Expand Down

0 comments on commit 7a80eb7

Please sign in to comment.