diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 45c36b86b1..ea77f816a3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -54,13 +54,16 @@ */ @Slf4j public class DynamicScalingYarnService extends YarnService { + public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; + public static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; + public static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; + public static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; private final Set removedContainerIds; - public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -98,6 +101,10 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) { ContainerId completedContainerId = containerStatus.getContainerId(); ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + // Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion() + // is called before onContainersAllocated(), resulting in the containerId missing from the containersMap. + // We use removedContainerIds to remember these containers and remove them from containerMap later + // when we call reviseWorkforcePlanAndRequestNewContainers method if (completedContainerInfo == null) { log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", completedContainerId); @@ -106,13 +113,11 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) { } log.info("Container {} running profile {} completed with exit status {}", - completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus() - ); + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()); if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { log.info("Container {} running profile {} completed with diagnostics: {}", - completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics() - ); + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()); } if (this.shutdownInProgress) { @@ -131,7 +136,7 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) { completedContainerId, completedContainerInfo.getWorkerProfileName()); requestContainersForWorkerProfile(workerProfile, 1); break; - case(137): // General OOM exit status + case(GENERAL_OOM_EXIT_STATUS_CODE): case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); @@ -232,7 +237,12 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont // Request a replacement container int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable + int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; + if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.", + MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); + return; + } Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) )); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 6b6521edf4..9142635d53 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -137,7 +137,6 @@ class YarnService extends AbstractIdleService { private final AMRMClientAsync amrmClientAsync; private final NMClientAsync nmClientAsync; private final ExecutorService containerLaunchExecutor; - private final boolean containerHostAffinityEnabled; private final String containerTimezone; private final String proxyJvmArgs; @@ -197,8 +196,6 @@ public YarnService(Config config, String applicationName, String applicationId, this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler())); this.nmClientAsync.init(this.yarnConfiguration); - this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED); - this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; @@ -536,26 +533,6 @@ private String buildContainerCommand(Container container, String workerProfileNa containerProcessName).append(".").append(ApplicationConstants.STDERR).toString(); } - /** - * Check the exit status of a completed container and see if the replacement container - * should try to be started on the same node. Some exit status indicates a disk or - * node failure and in such cases the replacement container should try to be started on - * a different node. - */ - private boolean shouldStickToTheSameNode(int containerExitStatus) { - switch (containerExitStatus) { - case ContainerExitStatus.DISKS_FAILED: - return false; - case ContainerExitStatus.ABORTED: - // Mostly likely this exit status is due to node failures because the - // application itself will not release containers. - return false; - default: - // Stick to the same node for other cases if host affinity is enabled. - return this.containerHostAffinityEnabled; - } - } - /** * Handle the completion of a container. * Just removes the containerId from {@link #containerMap}