From e8f757412f64fca8d0aa2866fd8c6269aa997739 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 13 Jan 2025 15:43:52 +0530 Subject: [PATCH 01/12] initial dynamicscalingyarnservice changes for handleContainerCompletion --- .../yarn/DynamicScalingYarnService.java | 158 +++++++++- .../gobblin/temporal/yarn/YarnService.java | 286 ++++-------------- .../temporal/yarn/YarnServiceTest.java | 21 +- 3 files changed, 217 insertions(+), 248 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 0720017b852..fa991b39f78 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -17,10 +17,20 @@ package org.apache.gobblin.temporal.yarn; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.eventbus.EventBus; @@ -28,6 +38,8 @@ import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; import org.apache.gobblin.temporal.dynamic.ScalingDirective; import org.apache.gobblin.temporal.dynamic.StaffingDeltas; import org.apache.gobblin.temporal.dynamic.WorkerProfile; @@ -47,6 +59,8 @@ public class DynamicScalingYarnService extends YarnService { private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; + private final Set removedContainerIds; + private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { @@ -54,6 +68,8 @@ public DynamicScalingYarnService(Config config, String applicationName, String a this.actualWorkforceStaffing = WorkforceStaffing.initialize(0); this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY)); + this.removedContainerIds = ConcurrentHashMap.newKeySet(); + this.profileNameSuffixGenerator = new AtomicLong(); } @Override @@ -62,6 +78,71 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + *

+ * A container completes in either of the following conditions: + *

    + *
  1. The container gets stopped by the ApplicationMaster.
  2. + *
  3. Some error happens in the container and caused the container to exit
  4. + *
  5. The container gets preempted by the ResourceManager
  6. + *
  7. The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory
  8. + *
+ * A replacement container is needed in all except the first case. + *

+ */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { + ContainerId completedContainerId = containerStatus.getContainerId(); + ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + + if (completedContainerInfo == null) { + log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", + completedContainerId); + this.removedContainerIds.add(completedContainerId); + return; + } + + log.info("Container {} running profile {} completed with exit status {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus() + ); + + if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { + log.info("Container {} running profile {} completed with diagnostics: {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics() + ); + } + + if (this.shutdownInProgress) { + log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId); + return; + } + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + + switch (containerStatus.getExitStatus()) { + case(ContainerExitStatus.ABORTED): + handleAbortedContainer(completedContainerId, completedContainerInfo); + break; + case(ContainerExitStatus.PREEMPTED): + log.info("Container {} for profile {} preempted, starting to launching a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(workerProfile, 1); + break; + case(137): // General OOM exit status + case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): + case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): + handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); + break; + case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed + log.info("Exit status 1.CompletedContainerInfo = {}", completedContainerInfo); + break; + default: + break; + } + } + /** * Revises the workforce plan and requests new containers based on the given scaling directives. * @@ -71,6 +152,23 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List iterator = removedContainerIds.iterator(); + while (iterator.hasNext()) { + ContainerId containerId = iterator.next(); + ContainerInfo containerInfo = this.containerMap.remove(containerId); + if (containerInfo != null) { + WorkerProfile workerProfile = containerInfo.getWorkerProfile(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, + System.currentTimeMillis()); + } + iterator.remove(); + } + } + this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); @@ -78,22 +176,64 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List { - if (profileDelta.getDelta() > 0) { // scale up! - WorkerProfile workerProfile = profileDelta.getProfile(); - String profileName = workerProfile.getName(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); - int delta = profileDelta.getDelta(); + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int delta = profileDelta.getDelta(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + if (delta > 0) { // scale up! log.info("Requesting {} new containers for profile {} having currently {} containers", delta, WorkforceProfiles.renderName(profileName), currNumContainers); requestContainersForWorkerProfile(workerProfile, delta); // update our staffing after requesting new containers this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); - } else if (profileDelta.getDelta() < 0) { // scale down! - // TODO: Decide how to handle negative deltas - log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", - profileDelta.getProfile().getName(), profileDelta.getDelta()); + } else if (delta < 0) { // scale down! + log.info("Releasing {} containers for profile {} having currently {} containers", -delta, + WorkforceProfiles.renderName(profileName), currNumContainers); + releaseContainersForWorkerProfile(profileName, delta); + // update our staffing after releasing containers + int numContainersAfterRelease = Math.max(currNumContainers + delta, 0); + this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis()); } // else, already at staffing plan (or at least have requested, so in-progress) }); } + private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + // Case 1 : Container release requested while scaling down + if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) { + log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName()); + this.releasedContainerCache.invalidate(completedContainerId); + return; + } + + // Case 2 : Container release was not requested, we need to request a replacement container + log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1); + } + + private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + // Update the current staffing to reflect the container that exited with OOM + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + } + + // Request a replacement container + int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable + Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), + new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) + )); + ScalingDirective scalingDirective = new ScalingDirective( + workerProfile.getName() + "-" + profileNameSuffixGenerator.getAndIncrement(), + 1, + System.currentTimeMillis(), + optProfileDerivation + ); + reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(scalingDirective)); + } + } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index ec4da215a63..95220aa99c8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -81,7 +81,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; import com.google.common.io.Closer; import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; @@ -106,10 +105,8 @@ import org.apache.gobblin.yarn.GobblinYarnEventConstants; import org.apache.gobblin.yarn.GobblinYarnMetricTagNames; import org.apache.gobblin.yarn.YarnHelixUtils; -import org.apache.gobblin.yarn.event.ContainerReleaseRequest; -import org.apache.gobblin.yarn.event.ContainerShutdownRequest; -import org.apache.gobblin.yarn.event.NewContainerRequest; import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; /** * This class is responsible for all Yarn-related stuffs including ApplicationMaster registration, @@ -124,13 +121,9 @@ class YarnService extends AbstractIdleService { private static final Logger LOGGER = LoggerFactory.getLogger(YarnService.class); - private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN"; - private final String applicationName; private final String applicationId; private final String appViewAcl; - //Default helix instance tag derived from cluster level config - private final String helixInstanceTags; protected final Config config; private final EventBus eventBus; @@ -146,11 +139,7 @@ class YarnService extends AbstractIdleService { private final AMRMClientAsync amrmClientAsync; private final NMClientAsync nmClientAsync; private final ExecutorService containerLaunchExecutor; - private final int requestedContainerMemoryMbs; - private final int requestedContainerCores; private final boolean containerHostAffinityEnabled; - - private final int helixInstanceMaxRetries; private final String containerTimezone; private final String proxyJvmArgs; @@ -164,34 +153,18 @@ class YarnService extends AbstractIdleService { private final Object allContainersStopped = new Object(); - // A map from container IDs to Container instances, Helix participant IDs of the containers and Helix Tag - @VisibleForTesting - @Getter(AccessLevel.PROTECTED) - private final ConcurrentMap containerMap = Maps.newConcurrentMap(); + // A map from container IDs to Container instances, WorkerProfile Name and WorkerProfile Object + protected final ConcurrentMap containerMap = new ConcurrentHashMap<>(); // A cache of the containers with an outstanding container release request. // This is a cache instead of a set to get the automatic cleanup in case a container completes before the requested // release. - @VisibleForTesting - @Getter(AccessLevel.PROTECTED) - private final Cache releasedContainerCache; - - // A map from Helix instance names to the number times the instances are retried to be started - private final ConcurrentMap helixInstanceRetryCount = Maps.newConcurrentMap(); - - // A concurrent HashSet of unused Helix instance names. An unused Helix instance name gets put - // into the set if the container running the instance completes. Unused Helix - // instance names get picked up when replacement containers get allocated. - private final Set unusedHelixInstanceNames = ConcurrentHashMap.newKeySet(); - - // The map from helix tag to allocated container count - private final ConcurrentMap allocatedContainerCountMap = Maps.newConcurrentMap(); - private final ConcurrentMap removedContainerID = Maps.newConcurrentMap(); + protected final Cache releasedContainerCache; private final AtomicInteger priorityNumGenerator = new AtomicInteger(0); private final Map resourcePriorityMap = new HashMap<>(); - private volatile boolean shutdownInProgress = false; + protected volatile boolean shutdownInProgress = false; private final boolean jarCacheEnabled; private static final long DEFAULT_ALLOCATION_REQUEST_ID = 0L; @@ -226,14 +199,8 @@ public YarnService(Config config, String applicationName, String applicationId, this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler())); this.nmClientAsync.init(this.yarnConfiguration); - this.requestedContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - this.requestedContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY); this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED); - this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES); - this.helixInstanceTags = ConfigUtils.getString(config, - GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG); - this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; @@ -257,53 +224,10 @@ public YarnService(Config config, String applicationName, String applicationId, } - @SuppressWarnings("unused") - @Subscribe - public void handleNewContainerRequest(NewContainerRequest newContainerRequest) { - if (!this.maxResourceCapacity.isPresent()) { - LOGGER.error(String.format( - "Unable to handle new container request as maximum resource capacity is not available: " - + "[memory (MBs) requested = %d, vcores requested = %d]", this.requestedContainerMemoryMbs, - this.requestedContainerCores)); - return; - } - requestContainer(newContainerRequest.getReplacedContainer().transform(container -> container.getNodeId().getHost()), - newContainerRequest.getResource()); - } - protected NMClientCallbackHandler getNMClientCallbackHandler() { return new NMClientCallbackHandler(); } - @SuppressWarnings("unused") - @Subscribe - public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) { - for (Container container : containerShutdownRequest.getContainers()) { - LOGGER.info(String.format("Stopping container %s running on %s", container.getId(), container.getNodeId())); - this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId()); - } - } - - /** - * Request the Resource Manager to release the container - * @param containerReleaseRequest containers to release - */ - @Subscribe - public void handleContainerReleaseRequest(ContainerReleaseRequest containerReleaseRequest) { - for (Container container : containerReleaseRequest.getContainers()) { - LOGGER.info(String.format("Releasing container %s running on %s", container.getId(), container.getNodeId())); - - // Record that this container was explicitly released so that a new one is not spawned to replace it - // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion() - // can check for the container id and skip spawning a replacement container. - // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently - // with the release call. So in some cases a replacement container may have already been spawned before - // the container is put into the black list. - this.releasedContainerCache.put(container.getId(), ""); - this.amrmClientAsync.releaseAssignedContainer(container.getId()); - } - } - @Override protected synchronized void startUp() throws Exception { LOGGER.info("Starting the TemporalYarnService"); @@ -335,8 +259,8 @@ protected void shutDown() throws IOException { // Stop the running containers for (ContainerInfo containerInfo : this.containerMap.values()) { - LOGGER.info("Stopping container {} running participant {}", containerInfo.getContainer().getId(), - containerInfo.getHelixParticipantId()); + LOGGER.info("Stopping container {} running worker profile {}", containerInfo.getContainer().getId(), + containerInfo.getWorkerProfileName()); this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), containerInfo.getContainer().getNodeId()); } @@ -411,10 +335,24 @@ protected synchronized void requestContainersForWorkerProfile(WorkerProfile work requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, containerCores), Optional.of(allocationRequestId)); } - private void requestContainer(Optional preferredNode, Optional resourceOptional) { - Resource desiredResource = resourceOptional.or(Resource.newInstance( - this.requestedContainerMemoryMbs, this.requestedContainerCores)); - requestContainer(preferredNode, desiredResource, Optional.absent()); + protected synchronized void releaseContainersForWorkerProfile(String profileName, int numContainers) { + Iterator> containerMapIterator = this.containerMap.entrySet().iterator(); + while (containerMapIterator.hasNext() && numContainers > 0) { + Map.Entry entry = containerMapIterator.next(); + if (entry.getValue().getWorkerProfile().getName().equals(profileName)) { + ContainerId containerId = entry.getKey(); + LOGGER.info("Releasing container {} running profile {}", containerId, WorkforceProfiles.renderName(profileName)); + // Record that this container was explicitly released so that a new one is not spawned to replace it + // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion() + // can check for the container id and skip spawning a replacement container. + // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently + // with the release call. So in some cases a replacement container may have already been spawned before + // the container is put into the black list. + this.releasedContainerCache.put(containerId, ""); + this.amrmClientAsync.releaseAssignedContainer(containerId); + numContainers--; + } + } } /** @@ -424,7 +362,7 @@ private void requestContainer(Optional preferredNode, Optional * @param resource */ protected void requestContainers(int numContainers, Resource resource, Optional optAllocationRequestId) { - LOGGER.info("Requesting {} containers with resource={} and allocation request id = {}", numContainers, resource, optAllocationRequestId); + LOGGER.info("Requesting {} containers with resource = {} and allocation request id = {}", numContainers, resource, optAllocationRequestId); IntStream.range(0, numContainers) .forEach(i -> requestContainer(Optional.absent(), resource, optAllocationRequestId)); } @@ -553,16 +491,7 @@ protected ByteBuffer getSecurityTokens() throws IOException { } @VisibleForTesting - protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) { - long allocationRequestId = container.getAllocationRequestId(); - WorkerProfile workerProfile = Optional.fromNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId)) - .or(() -> { - LOGGER.warn("No Worker Profile found for {}, so falling back to default", allocationRequestId); - return this.workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID, k -> { - LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker Profile even yet mapped to the default allocation request ID {} - creating one now", DEFAULT_ALLOCATION_REQUEST_ID); - return new WorkerProfile(this.config); - }); - }); + protected String buildContainerCommand(Container container, String workerProfileName, WorkerProfile workerProfile) { Config workerProfileConfig = workerProfile.getConfig(); double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig, @@ -574,13 +503,13 @@ protected String buildContainerCommand(Container container, String helixParticip GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); Preconditions.checkArgument(workerJvmMemoryXmxRatio >= 0 && workerJvmMemoryXmxRatio <= 1, - workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + + workerProfileName + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); long containerMemoryMbs = container.getResource().getMemorySize(); Preconditions.checkArgument(workerJvmMemoryOverheadMbs < containerMemoryMbs * workerJvmMemoryXmxRatio, - workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + + workerProfileName + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); @@ -602,14 +531,8 @@ protected String buildContainerCommand(Container container, String helixParticip .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) .append(" ").append(this.applicationName) .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME) - .append(" ").append(this.applicationId) - .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) - .append(" ").append(helixParticipantId); + .append(" ").append(this.applicationId); - if (!Strings.isNullOrEmpty(helixInstanceTag)) { - containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME) - .append(" ").append(helixInstanceTag); - } return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( containerProcessName).append(".").append(ApplicationConstants.STDOUT) .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( @@ -637,103 +560,11 @@ private boolean shouldStickToTheSameNode(int containerExitStatus) { } /** - * Handle the completion of a container. A new container will be requested to replace the one - * that just exited. Depending on the exit status and if container host affinity is enabled, - * the new container may or may not try to be started on the same node. - *

- * A container completes in either of the following conditions: 1) some error happens in the - * container and caused the container to exit, 2) the container gets killed due to some reason, - * for example, if it runs over the allowed amount of virtual or physical memory, 3) the gets - * preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster. - * A replacement container is needed in all but the last case. + * Handle the completion of a container. + * Just removes the containerId from {@link #containerMap} */ protected void handleContainerCompletion(ContainerStatus containerStatus) { - ContainerInfo completedContainerInfo = this.containerMap.remove(containerStatus.getContainerId()); - //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might - //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the - //containerId missing from the containersMap. - // We use removedContainerID to remember these containers and remove them from containerMap later when we call requestTargetNumberOfContainers method - if (completedContainerInfo == null) { - removedContainerID.putIfAbsent(containerStatus.getContainerId(), ""); - } - String completedInstanceName = UNKNOWN_HELIX_INSTANCE; - - String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag(); - if (completedContainerInfo != null) { - allocatedContainerCountMap.get(helixTag).decrementAndGet(); - } - - LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d", - containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus())); - - if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) { - LOGGER.info(String.format("Received the following diagnostics information for container %s: %s", - containerStatus.getContainerId(), containerStatus.getDiagnostics())); - } - - switch(containerStatus.getExitStatus()) { - case(ContainerExitStatus.ABORTED): - if (handleAbortedContainer(containerStatus, completedContainerInfo, completedInstanceName)) { - return; - } - break; - case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed - LOGGER.info("Exit status 1. CompletedContainerInfo={}", completedContainerInfo); - break; - default: - break; - } - - if (this.shutdownInProgress) { - return; - } - if(completedContainerInfo != null) { - this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0)); - int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet(); - - // Populate event metadata - Optional> eventMetadataBuilder = Optional.absent(); - if (this.eventSubmitter.isPresent()) { - eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus)); - eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName); - eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + ""); - } - - if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) { - if (this.eventSubmitter.isPresent()) { - this.eventSubmitter.get() - .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build()); - } - - LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName); - return; - } - - // Add the Helix instance name of the completed container to the set of unused - // instance names so they can be reused by a replacement container. - LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName); - this.unusedHelixInstanceNames.add(completedInstanceName); - - /** - * NOTE: logic for handling container failure is removed because {@link #YarnService} relies on the auto scaling manager - * to control the number of containers by polling helix for the current number of tasks - * Without that integration, that code requests too many containers when there are exceptions and overloads yarn - */ - } - } - - private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo, - String completedInstanceName) { - if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) { - LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId()); - if (completedContainerInfo != null) { - LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName); - this.unusedHelixInstanceNames.add(completedInstanceName); - } - return true; - } - LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId()); - return false; + this.containerMap.remove(containerStatus.getContainerId()); } private ImmutableMap.Builder buildContainerStatusEventMetadata(ContainerStatus containerStatus) { @@ -782,33 +613,28 @@ public void onContainersCompleted(List statuses) { @Override public void onContainersAllocated(List containers) { for (final Container container : containers) { + long allocationRequestId = container.getAllocationRequestId(); + WorkerProfile workerProfile = Optional.fromNullable(workerProfileByAllocationRequestId.get(allocationRequestId)) + .or(() -> { + LOGGER.warn("No Worker Profile found for {}, so falling back to default", allocationRequestId); + return workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID, k -> { + LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker Profile even yet mapped to the default allocation request ID {} - creating one now", DEFAULT_ALLOCATION_REQUEST_ID); + return new WorkerProfile(config); + }); + }); + String containerId = container.getId().toString(); - String containerHelixTag = helixInstanceTags; if (eventSubmitter.isPresent()) { eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION, GobblinYarnMetricTagNames.CONTAINER_ID, containerId); } - LOGGER.info("Container {} has been allocated with resource {} for helix tag {}", - container.getId(), container.getResource(), containerHelixTag); - - //Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live. - //Once we find a candidate instance, it is removed from the set. - String instanceName = null; - - //Ensure that updates to unusedHelixInstanceNames are visible to other threads that might concurrently - //invoke the callback on container allocation. - synchronized (this) { - Iterator iterator = unusedHelixInstanceNames.iterator(); - while (iterator.hasNext()) { - instanceName = iterator.next(); - } - } + LOGGER.info("Container {} has been allocated with resource {} for Worker Profile {}", + container.getId(), container.getResource(), WorkforceProfiles.renderName(workerProfile.getName())); - ContainerInfo containerInfo = new ContainerInfo(container, instanceName, containerHelixTag); + ContainerInfo containerInfo = new ContainerInfo(container, + WorkforceProfiles.renderName(workerProfile.getName()), workerProfile); containerMap.put(container.getId(), containerInfo); - allocatedContainerCountMap.putIfAbsent(containerHelixTag, new AtomicInteger(0)); - allocatedContainerCountMap.get(containerHelixTag).incrementAndGet(); // Find matching requests and remove the request (YARN-660). We the scheduler are responsible // for cleaning up requests after allocation based on the design in the described ticket. @@ -969,26 +795,26 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { } } - // Class encapsulates Container instances, Helix participant IDs of the containers, Helix Tag, and + // Class encapsulates Container instance, WorkerProfile name to print, WorkerProfile, and // initial startup command @Getter class ContainerInfo { private final Container container; - private final String helixParticipantId; - private final String helixTag; + private final String workerProfileName; // Storing this to avoid calling WorkforceProfiles.renderName(workerProfile.getName()) while logging + private final WorkerProfile workerProfile; private final String startupCommand; - public ContainerInfo(Container container, String helixParticipantId, String helixTag) { + public ContainerInfo(Container container, String workerProfileName, WorkerProfile workerProfile) { this.container = container; - this.helixParticipantId = helixParticipantId; - this.helixTag = helixTag; - this.startupCommand = YarnService.this.buildContainerCommand(container, helixParticipantId, helixTag); + this.workerProfileName = workerProfileName; + this.workerProfile = workerProfile; + this.startupCommand = YarnService.this.buildContainerCommand(container, workerProfileName, workerProfile); } @Override public String toString() { - return String.format("ContainerInfo{ container=%s, helixParticipantId=%s, helixTag=%s, startupCommand=%s }", - container.getId(), helixParticipantId, helixTag, startupCommand); + return String.format("ContainerInfo{ container=%s, workerProfileName=%s, startupCommand=%s }", + container.getId(), workerProfileName, startupCommand); } } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index 3c81316b85c..8d216450a34 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -20,8 +20,13 @@ import java.io.IOException; import java.net.URL; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -103,12 +108,6 @@ public void testBuildContainerCommand() throws Exception { .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio)) .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs)); - Resource resource = Resource.newInstance(resourceMemoryMB, 2); - - Container mockContainer = Mockito.mock(Container.class); - Mockito.when(mockContainer.getResource()).thenReturn(resource); - Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L); - YarnService yarnService = new YarnService( config, "testApplicationName", @@ -118,9 +117,13 @@ public void testBuildContainerCommand() throws Exception { eventBus ); - yarnService.startUp(); - - String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag"); + WorkerProfile workerProfile = new WorkerProfile(config); + ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), + 0), 0); + Resource resource = Resource.newInstance(resourceMemoryMB, 2); + Container container = Container.newInstance(containerId, null, null, resource, null, null); + YarnService.ContainerInfo containerInfo = yarnService.new ContainerInfo(container, WorkforceProfiles.BASELINE_NAME_RENDERING, workerProfile); + String command = containerInfo.getStartupCommand(); Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M")); } } From 089e2e0b18ce902258811234f5e1d0700a911692 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 14 Jan 2025 19:30:30 +0530 Subject: [PATCH 02/12] fix updating workforce plan while containerCompletion --- .../yarn/DynamicScalingYarnService.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index fa991b39f78..45c36b86b1c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -17,7 +17,7 @@ package org.apache.gobblin.temporal.yarn; -import java.util.Collections; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -60,6 +60,7 @@ public class DynamicScalingYarnService extends YarnService { /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; private final Set removedContainerIds; + public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -164,6 +165,9 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List 0) { this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); } iterator.remove(); } @@ -214,11 +218,16 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + List scalingDirectives = new ArrayList<>(); + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); // Update the current staffing to reflect the container that exited with OOM int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); if (currNumContainers > 0) { this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, + // otherwise extra containers will be requested when calculating deltas + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); } // Request a replacement container @@ -227,13 +236,13 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) )); - ScalingDirective scalingDirective = new ScalingDirective( - workerProfile.getName() + "-" + profileNameSuffixGenerator.getAndIncrement(), + scalingDirectives.add(new ScalingDirective( + DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(), 1, System.currentTimeMillis(), optProfileDerivation - ); - reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(scalingDirective)); + )); + reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); } } From e69a77ea19353c0dbf4c08cd381bf281eef51022 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 21 Jan 2025 22:48:25 +0530 Subject: [PATCH 03/12] improved dummy tests --- .../apache/gobblin/temporal/yarn/YarnService.java | 5 +---- .../dynamic/DummyScalingDirectiveSource.java | 12 ++++++++++++ .../yarn/DynamicScalingYarnServiceManagerTest.java | 8 ++++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 95220aa99c8..6b6521edf46 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -70,7 +70,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -134,7 +133,6 @@ class YarnService extends AbstractIdleService { private final Optional gobblinMetrics; private final Optional eventSubmitter; - @VisibleForTesting @Getter(AccessLevel.PROTECTED) private final AMRMClientAsync amrmClientAsync; private final NMClientAsync nmClientAsync; @@ -490,8 +488,7 @@ protected ByteBuffer getSecurityTokens() throws IOException { } } - @VisibleForTesting - protected String buildContainerCommand(Container container, String workerProfileName, WorkerProfile workerProfile) { + private String buildContainerCommand(Container container, String workerProfileName, WorkerProfile workerProfile) { Config workerProfileConfig = workerProfile.getConfig(); double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig, diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java index 6bdfe46276f..8b6d0bf2702 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java @@ -70,6 +70,18 @@ public List getScalingDirectives() { new ScalingDirective("firstProfile", 5, currentTime), new ScalingDirective("secondProfile", 3, currentTime + 1) ); + } else if (currNumInvocations == 3) { + // changing set point to 0 for both profiles so that all containers should be released + return Arrays.asList( + new ScalingDirective("firstProfile", 0, currentTime), + new ScalingDirective("secondProfile", 0, currentTime + 1) + ); + } else if (currNumInvocations == 4) { + // increasing containers count for both profiles so that new containers should be launched + return Arrays.asList( + new ScalingDirective("firstProfile", 5, currentTime), + new ScalingDirective("secondProfile", 5, currentTime + 1) + ); } return new ArrayList<>(); } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java index c43a27fa768..96fc0fa1e5a 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -69,14 +69,14 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 3 + // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 5 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times + Thread.sleep(7000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times testDynamicScalingYarnServiceManager.shutDown(); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); } @Test From 7a80eb7910a5a48908988ea799fbc5306cd393bb Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Fri, 31 Jan 2025 11:51:14 +0530 Subject: [PATCH 04/12] addressed review comments --- .../yarn/DynamicScalingYarnService.java | 24 +++++++++++++------ .../gobblin/temporal/yarn/YarnService.java | 23 ------------------ 2 files changed, 17 insertions(+), 30 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 45c36b86b1c..ea77f816a39 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -54,13 +54,16 @@ */ @Slf4j public class DynamicScalingYarnService extends YarnService { + public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; + public static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; + public static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; + public static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; private final Set removedContainerIds; - public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -98,6 +101,10 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) { ContainerId completedContainerId = containerStatus.getContainerId(); ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + // Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion() + // is called before onContainersAllocated(), resulting in the containerId missing from the containersMap. + // We use removedContainerIds to remember these containers and remove them from containerMap later + // when we call reviseWorkforcePlanAndRequestNewContainers method if (completedContainerInfo == null) { log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", completedContainerId); @@ -106,13 +113,11 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) { } log.info("Container {} running profile {} completed with exit status {}", - completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus() - ); + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()); if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { log.info("Container {} running profile {} completed with diagnostics: {}", - completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics() - ); + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()); } if (this.shutdownInProgress) { @@ -131,7 +136,7 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) { completedContainerId, completedContainerInfo.getWorkerProfileName()); requestContainersForWorkerProfile(workerProfile, 1); break; - case(137): // General OOM exit status + case(GENERAL_OOM_EXIT_STATUS_CODE): case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); @@ -232,7 +237,12 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont // Request a replacement container int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable + int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; + if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.", + MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); + return; + } Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) )); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 6b6521edf46..9142635d533 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -137,7 +137,6 @@ class YarnService extends AbstractIdleService { private final AMRMClientAsync amrmClientAsync; private final NMClientAsync nmClientAsync; private final ExecutorService containerLaunchExecutor; - private final boolean containerHostAffinityEnabled; private final String containerTimezone; private final String proxyJvmArgs; @@ -197,8 +196,6 @@ public YarnService(Config config, String applicationName, String applicationId, this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler())); this.nmClientAsync.init(this.yarnConfiguration); - this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED); - this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; @@ -536,26 +533,6 @@ private String buildContainerCommand(Container container, String workerProfileNa containerProcessName).append(".").append(ApplicationConstants.STDERR).toString(); } - /** - * Check the exit status of a completed container and see if the replacement container - * should try to be started on the same node. Some exit status indicates a disk or - * node failure and in such cases the replacement container should try to be started on - * a different node. - */ - private boolean shouldStickToTheSameNode(int containerExitStatus) { - switch (containerExitStatus) { - case ContainerExitStatus.DISKS_FAILED: - return false; - case ContainerExitStatus.ABORTED: - // Mostly likely this exit status is due to node failures because the - // application itself will not release containers. - return false; - default: - // Stick to the same node for other cases if host affinity is enabled. - return this.containerHostAffinityEnabled; - } - } - /** * Handle the completion of a container. * Just removes the containerId from {@link #containerMap} From 90b7e6c66f6dc7da2287131319bb917e5a469d71 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Thu, 6 Feb 2025 10:14:23 +0530 Subject: [PATCH 05/12] addressed review comments --- .../yarn/DynamicScalingYarnService.java | 20 +++++++++++++------ .../gobblin/temporal/yarn/YarnService.java | 3 +++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index ea77f816a39..4d660fad529 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -54,10 +54,11 @@ */ @Slf4j public class DynamicScalingYarnService extends YarnService { - public static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; - public static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; - public static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; - public static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB + private static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; + private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1; + private static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; + private static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; + private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; @@ -141,10 +142,15 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) { case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); break; - case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed + case(LAUNCH_CONTAINER_FAILED_EXIT_CODE): log.info("Exit status 1.CompletedContainerInfo = {}", completedContainerInfo); break; + case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION: + case ContainerExitStatus.SUCCESS: + break; default: + log.warn("Container {} exited with unhandled status code {}. ContainerInfo: {}", + completedContainerId, containerStatus.getExitStatus(), completedContainerInfo); break; } } @@ -238,7 +244,9 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont // Request a replacement container int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; - if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS && newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS; + } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.", MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); return; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 9142635d533..2818982baba 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -331,6 +331,7 @@ protected synchronized void requestContainersForWorkerProfile(WorkerProfile work } protected synchronized void releaseContainersForWorkerProfile(String profileName, int numContainers) { + int numContainersToRelease = numContainers; Iterator> containerMapIterator = this.containerMap.entrySet().iterator(); while (containerMapIterator.hasNext() && numContainers > 0) { Map.Entry entry = containerMapIterator.next(); @@ -348,6 +349,8 @@ protected synchronized void releaseContainersForWorkerProfile(String profileName numContainers--; } } + LOGGER.info("Released {} containers out of {} requested for profile {}", numContainersToRelease - numContainers, + numContainersToRelease, profileName); } /** From 4c7b019d72bab5c6a8f6c6296b80e150fac50fa6 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Fri, 7 Feb 2025 10:55:04 +0530 Subject: [PATCH 06/12] fixed typo --- .../temporal/yarn/DynamicScalingYarnServiceManagerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java index 96fc0fa1e5a..8b0602f035f 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -70,11 +70,11 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list - // so the total number of invocations after three invocations should always be 5 + // so the total number of invocations after five invocations should always be 5 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(7000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times + Thread.sleep(7000); // 7 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); } From 29891fd62fc15123fdebf0c51e0a5110b6726401 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 10 Feb 2025 21:30:16 +0530 Subject: [PATCH 07/12] added test for handle container completion --- .../yarn/DynamicScalingYarnService.java | 30 +-- .../yarn/DynamicScalingYarnServiceTest.java | 206 +++++++++++++++++- 2 files changed, 215 insertions(+), 21 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 4d660fad529..579c53106d2 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -21,8 +21,8 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.collections.CollectionUtils; @@ -56,15 +56,15 @@ public class DynamicScalingYarnService extends YarnService { private static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1; - private static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; - private static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; + protected static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; + protected static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; - private final Set removedContainerIds; + private final Queue removedContainerIds; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -73,7 +73,7 @@ public DynamicScalingYarnService(Config config, String applicationName, String a this.actualWorkforceStaffing = WorkforceStaffing.initialize(0); this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY)); - this.removedContainerIds = ConcurrentHashMap.newKeySet(); + this.removedContainerIds = new ConcurrentLinkedQueue<>(); this.profileNameSuffixGenerator = new AtomicLong(); } @@ -232,32 +232,32 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont List scalingDirectives = new ArrayList<>(); WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + long currTimeMillis = System.currentTimeMillis(); // Update the current staffing to reflect the container that exited with OOM int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); if (currNumContainers > 0) { - this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 1); // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, // otherwise extra containers will be requested when calculating deltas - scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 2)); } // Request a replacement container int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; - if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS && newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { - newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS; - } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { - log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.", - MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); + if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + log.warn("Container {} already had max allowed memory {} MBs. Not requesting a replacement container.", + completedContainerId, currContainerMemoryMbs); return; } + int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER, + MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) )); scalingDirectives.add(new ScalingDirective( DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(), 1, - System.currentTimeMillis(), + currTimeMillis + 3, optProfileDerivation )); reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java index 6c0946aabbe..ef10006ea89 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -17,38 +17,109 @@ package org.apache.gobblin.temporal.yarn; -import java.net.URL; import java.util.Collections; +import java.util.List; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.google.common.base.Optional; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; + /** Tests for {@link DynamicScalingYarnService} */ public class DynamicScalingYarnServiceTest { private Config defaultConfigs; + private final int initNumContainers = 1; + private final int initMemoryMbs = 1024; + private final int initCores = 1; + private final Resource initResource = Resource.newInstance(initMemoryMbs, initCores); private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); private final EventBus eventBus = new EventBus("TemporalDynamicScalingYarnServiceTest"); + private AMRMClientAsync mockAMRMClient; + private RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse; + private WorkerProfile testBaselineworkerProfile; + private DynamicScalingYarnService dynamicScalingYarnServiceSpy; @BeforeClass - public void setup() { - URL url = DynamicScalingYarnServiceTest.class.getClassLoader() - .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); // using same initial config as of YarnServiceTest - Assert.assertNotNull(url, "Could not find resource " + url); - this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); + public void setup() throws Exception { + this.defaultConfigs = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, ConfigValueFactory.fromAnyRef(initCores)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, ConfigValueFactory.fromAnyRef(initMemoryMbs)) + .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, ConfigValueFactory.fromAnyRef(initNumContainers)); + + this.testBaselineworkerProfile = new WorkerProfile(this.defaultConfigs); + + mockAMRMClient = Mockito.mock(AMRMClientAsync.class); + mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class); + + MockedStatic amrmClientAsyncMockStatic = Mockito.mockStatic(AMRMClientAsync.class); + + amrmClientAsyncMockStatic.when(() -> AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class))) + .thenReturn(mockAMRMClient); + Mockito.doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class)); + + Mockito.when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString())) + .thenReturn(mockRegisterApplicationMasterResponse); + Mockito.when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability()) + .thenReturn(Mockito.mock(Resource.class)); + } + + @BeforeMethod + public void setupMethod() throws Exception { + DynamicScalingYarnService dynamicScalingYarnService = new DynamicScalingYarnService(this.defaultConfigs, "testApp", "testAppId", yarnConfiguration, mockFileSystem, eventBus); + dynamicScalingYarnServiceSpy = Mockito.spy(dynamicScalingYarnService); + Mockito.doNothing().when(dynamicScalingYarnServiceSpy).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); + dynamicScalingYarnServiceSpy.containerMap.clear(); + } + + @AfterMethod + public void cleanupMethod() { + dynamicScalingYarnServiceSpy.containerMap.clear(); + Mockito.reset(dynamicScalingYarnServiceSpy); + } + + @Test + public void testDynamicScalingYarnServiceStartupWithInitialContainers() throws Exception { + dynamicScalingYarnServiceSpy.startUp(); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(initNumContainers), resourceCaptor.capture(), Mockito.any(Optional.class)); + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); } @Test @@ -61,4 +132,127 @@ public void testReviseWorkforcePlanAndRequestNewContainers() throws Exception { dynamicScalingYarnServiceSpy.reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(baseScalingDirective)); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(numNewContainers), Mockito.any(Resource.class), Mockito.any(Optional.class)); } + + @DataProvider(name = "OOMExitStatusProvider") + public Object[][] OOMExitStatusProvider() { + return new Object[][] { + {ContainerExitStatus.KILLED_EXCEEDED_PMEM}, + {ContainerExitStatus.KILLED_EXCEEDED_VMEM}, + {DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE} + }; + } + + @DataProvider(name = "NonOOMExitStatusProviderWhichRequestReplacementContainer") + public Object[][] NonOOMExitStatusProviderWhichRequestReplacementContainer() { + return new Object[][] { + {ContainerExitStatus.ABORTED}, + {ContainerExitStatus.PREEMPTED} + }; + } + + @Test(dataProvider = "OOMExitStatusProvider") + public void testHandleContainerCompletionForStatusOOM(int containerExitStatusCode) throws Exception { + ContainerId containerId = generateRandomContainerId(); + DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); + ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); + Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); + + dynamicScalingYarnServiceSpy.startUp(); + dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated + + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); + + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs * DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + + @Test(dataProvider = "NonOOMExitStatusProviderWhichRequestReplacementContainer") + public void testHandleContainerCompletionForNonOOMStatusWhichRequestReplacementContainer(int containerExitStatusCode) throws Exception { + ContainerId containerId = generateRandomContainerId(); + DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); + ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); + Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); + + dynamicScalingYarnServiceSpy.startUp(); + dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated + + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + + @Test + public void testHandleContainerCompletionForAllOOMStatus() throws Exception { + ContainerId containerId1 = generateRandomContainerId(); + ContainerId containerId2 = generateRandomContainerId(); + ContainerId containerId3 = generateRandomContainerId(); + + DynamicScalingYarnService.ContainerInfo containerInfo1 = createBaselineContainerInfo(containerId1); + DynamicScalingYarnService.ContainerInfo containerInfo2 = createBaselineContainerInfo(containerId2); + DynamicScalingYarnService.ContainerInfo containerInfo3 = createBaselineContainerInfo(containerId3); + + ContainerStatus containerStatus1 = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus1.getContainerId()).thenReturn(containerId1); + Mockito.when(containerStatus1.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_VMEM); + + ContainerStatus containerStatus2 = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus2.getContainerId()).thenReturn(containerId2); + Mockito.when(containerStatus2.getExitStatus()).thenReturn(DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE); + + ContainerStatus containerStatus3 = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus3.getContainerId()).thenReturn(containerId3); + Mockito.when(containerStatus3.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_PMEM); + + dynamicScalingYarnServiceSpy.startUp(); + // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated + dynamicScalingYarnServiceSpy.containerMap.put(containerId1, containerInfo1); + dynamicScalingYarnServiceSpy.containerMap.put(containerId2, containerInfo2); + dynamicScalingYarnServiceSpy.containerMap.put(containerId3, containerInfo3); + + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus1); + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus2); + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus3); + + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(4)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(4)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + + List capturedResources = resourceCaptor.getAllValues(); + Assert.assertEquals(capturedResources.size(), 4); + + Resource capturedResource = capturedResources.get(0); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + + for (int idx = 1 ; idx < 4 ; idx++) { + capturedResource = capturedResources.get(idx); + Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs * DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + } + + private ContainerId generateRandomContainerId() { + return ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), + 0), (long) (Math.random() * 1000)); + } + + private DynamicScalingYarnService.ContainerInfo createBaselineContainerInfo(ContainerId containerId) { + Container container = Container.newInstance(containerId, null, null, initResource, null, null); + return dynamicScalingYarnServiceSpy.new ContainerInfo(container, WorkforceProfiles.BASELINE_NAME_RENDERING, testBaselineworkerProfile); + } } From 729bebbfe452389335d46932fb5ae1849955654a Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 10 Feb 2025 21:51:39 +0530 Subject: [PATCH 08/12] added test for non-replacement case --- .../yarn/DynamicScalingYarnServiceTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java index ef10006ea89..d28218d6919 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -150,6 +150,19 @@ public Object[][] NonOOMExitStatusProviderWhichRequestReplacementContainer() { }; } + @DataProvider(name = "ExitStatusProviderWhichDoesNotRequestReplacementContainer") + public Object[][] ExitStatusProviderWhichDoesNotRequestReplacementContainer() { + return new Object[][] { + {ContainerExitStatus.SUCCESS}, + {ContainerExitStatus.INVALID}, + {ContainerExitStatus.DISKS_FAILED}, + {ContainerExitStatus.KILLED_BY_APPMASTER}, + {ContainerExitStatus.KILLED_BY_RESOURCEMANAGER}, + {ContainerExitStatus.KILLED_AFTER_APP_COMPLETION}, + {ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER} + }; + } + @Test(dataProvider = "OOMExitStatusProvider") public void testHandleContainerCompletionForStatusOOM(int containerExitStatusCode) throws Exception { ContainerId containerId = generateRandomContainerId(); @@ -246,6 +259,22 @@ public void testHandleContainerCompletionForAllOOMStatus() throws Exception { } } + @Test(dataProvider = "ExitStatusProviderWhichDoesNotRequestReplacementContainer") + public void testHandleContainerCompletionForExitStatusWhichDoesNotRequestReplacementContainer(int containerExitStatusCode) throws Exception { + ContainerId containerId = generateRandomContainerId(); + DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); + ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); + Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); + dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); + // All zero invocation since startup is not called and no new containers should be requested + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); + } + + private ContainerId generateRandomContainerId() { return ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), 0), (long) (Math.random() * 1000)); From 861916aa5485bc9d3f02943047bd807088445ec2 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Tue, 11 Feb 2025 08:24:06 +0530 Subject: [PATCH 09/12] removed extra spaces --- .../yarn/DynamicScalingYarnServiceTest.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java index d28218d6919..9423e8998ec 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -18,8 +18,8 @@ package org.apache.gobblin.temporal.yarn; import java.util.Collections; - import java.util.List; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -170,17 +170,12 @@ public void testHandleContainerCompletionForStatusOOM(int containerExitStatusCod ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); - - dynamicScalingYarnServiceSpy.startUp(); dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); - Resource capturedResource = resourceCaptor.getValue(); Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs * DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER); Assert.assertEquals(capturedResource.getVirtualCores(), initCores); @@ -193,16 +188,12 @@ public void testHandleContainerCompletionForNonOOMStatusWhichRequestReplacementC ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); - - dynamicScalingYarnServiceSpy.startUp(); dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); - + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); Resource capturedResource = resourceCaptor.getValue(); Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); Assert.assertEquals(capturedResource.getVirtualCores(), initCores); @@ -230,7 +221,6 @@ public void testHandleContainerCompletionForAllOOMStatus() throws Exception { Mockito.when(containerStatus3.getContainerId()).thenReturn(containerId3); Mockito.when(containerStatus3.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_PMEM); - dynamicScalingYarnServiceSpy.startUp(); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated dynamicScalingYarnServiceSpy.containerMap.put(containerId1, containerInfo1); dynamicScalingYarnServiceSpy.containerMap.put(containerId2, containerInfo2); @@ -268,7 +258,6 @@ public void testHandleContainerCompletionForExitStatusWhichDoesNotRequestReplace Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); - // All zero invocation since startup is not called and no new containers should be requested Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); From f341c1e59e1d11731ad2f1b15c3e64b3cfd282a6 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 17 Feb 2025 10:34:55 +0530 Subject: [PATCH 10/12] addressed comments --- ...AbstractDynamicScalingYarnServiceManager.java | 6 +++++- .../temporal/yarn/DynamicScalingYarnService.java | 11 +++++------ .../DynamicScalingYarnServiceManagerTest.java | 16 ++++++++++++++++ .../yarn/DynamicScalingYarnServiceTest.java | 15 +++++++++++++++ 4 files changed, 41 insertions(+), 7 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java index ca6aa720641..8d92b0edae5 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java @@ -110,9 +110,13 @@ public void run() { List scalingDirectives = scalingDirectiveSource.getScalingDirectives(); if (CollectionUtils.isNotEmpty(scalingDirectives)) { dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); + } else { + dynamicScalingYarnService.calcDeltasAndRequestContainers(); } } catch (FileNotFoundException fnfe) { - log.warn("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace + log.debug("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace + // FNFE comes when scaling directives path is not yet created, so we should just calc delta & request containers if needed + dynamicScalingYarnService.calcDeltasAndRequestContainers(); } catch (IOException e) { log.error("Failed to get scaling directives", e); } catch (Throwable t) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 579c53106d2..1ba66d674f6 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -64,7 +64,7 @@ public class DynamicScalingYarnService extends YarnService { private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; - private final Queue removedContainerIds; + protected final Queue removedContainerIds; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -164,7 +164,11 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List iterator = removedContainerIds.iterator(); while (iterator.hasNext()) { @@ -176,15 +180,10 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List 0) { this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); - // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, - // otherwise extra containers will be requested when calculating deltas - scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); } iterator.remove(); } } - - this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java index 8b0602f035f..666e3c54c4b 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.yarn; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +65,19 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup Thread.sleep(3000); testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers(); + } + + @Test + public void testWhenScalingDirectivesThrowsFNFE() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenThrow(FileNotFoundException.class); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(2000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers(); } /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @@ -77,6 +91,7 @@ public void testWithDummyScalingDirectiveSource() throws IOException, Interrupte Thread.sleep(7000); // 7 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers(); } @Test @@ -95,6 +110,7 @@ public void testWithRandomScalingDirectives() throws IOException, InterruptedExc Thread.sleep(5000); testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers(); } /** Test implementation of {@link AbstractDynamicScalingYarnServiceManager} which returns passed diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java index 9423e8998ec..9556bccd1b4 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -263,6 +263,21 @@ public void testHandleContainerCompletionForExitStatusWhichDoesNotRequestReplace Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); } + @Test + public void testContainerRequestedWhenCompletionCalledBeforeAllocated() { + ContainerId containerId = generateRandomContainerId(); + DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); + dynamicScalingYarnServiceSpy.removedContainerIds.add(containerId); + dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); + dynamicScalingYarnServiceSpy.calcDeltasAndRequestContainers(); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + private ContainerId generateRandomContainerId() { return ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), From dee3fe4c4e533eb9eecb0d00918bc9375ee9cd91 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Thu, 20 Feb 2025 12:30:55 +0530 Subject: [PATCH 11/12] added a comment --- .../gobblin/temporal/yarn/DynamicScalingYarnService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 1ba66d674f6..0010a45ff8e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -59,6 +59,7 @@ public class DynamicScalingYarnService extends YarnService { protected static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; protected static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB + private static final int EPSILON_MIILIS = 1; /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; @@ -235,10 +236,10 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont // Update the current staffing to reflect the container that exited with OOM int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); if (currNumContainers > 0) { - this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 1); + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis); // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, // otherwise extra containers will be requested when calculating deltas - scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 2)); + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis)); } // Request a replacement container @@ -256,7 +257,7 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont scalingDirectives.add(new ScalingDirective( DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(), 1, - currTimeMillis + 3, + currTimeMillis + EPSILON_MIILIS, // Each scaling directive should have a newer timestamp than the previous one optProfileDerivation )); reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); From db9cf7ed56a937b5cb4f36834cd6b6d456e5e6ea Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Thu, 20 Feb 2025 12:34:36 +0530 Subject: [PATCH 12/12] corrected log statement --- .../temporal/yarn/AbstractDynamicScalingYarnServiceManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java index 8d92b0edae5..e49ec66a22e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java @@ -114,8 +114,8 @@ public void run() { dynamicScalingYarnService.calcDeltasAndRequestContainers(); } } catch (FileNotFoundException fnfe) { - log.debug("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace // FNFE comes when scaling directives path is not yet created, so we should just calc delta & request containers if needed + log.debug("Scaling directives file not found(possibly not yet created). Falling back to delta calculation. - " + fnfe.getMessage()); dynamicScalingYarnService.calcDeltasAndRequestContainers(); } catch (IOException e) { log.error("Failed to get scaling directives", e);