From e8f757412f64fca8d0aa2866fd8c6269aa997739 Mon Sep 17 00:00:00 2001 From: Vivek Rai Date: Mon, 13 Jan 2025 15:43:52 +0530 Subject: [PATCH] initial dynamicscalingyarnservice changes for handleContainerCompletion --- .../yarn/DynamicScalingYarnService.java | 158 +++++++++- .../gobblin/temporal/yarn/YarnService.java | 286 ++++-------------- .../temporal/yarn/YarnServiceTest.java | 21 +- 3 files changed, 217 insertions(+), 248 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 0720017b852..fa991b39f78 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -17,10 +17,20 @@ package org.apache.gobblin.temporal.yarn; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.eventbus.EventBus; @@ -28,6 +38,8 @@ import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; import org.apache.gobblin.temporal.dynamic.ScalingDirective; import org.apache.gobblin.temporal.dynamic.StaffingDeltas; import org.apache.gobblin.temporal.dynamic.WorkerProfile; @@ -47,6 +59,8 @@ public class DynamicScalingYarnService extends YarnService { private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; + private final Set removedContainerIds; + private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { @@ -54,6 +68,8 @@ public DynamicScalingYarnService(Config config, String applicationName, String a this.actualWorkforceStaffing = WorkforceStaffing.initialize(0); this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY)); + this.removedContainerIds = ConcurrentHashMap.newKeySet(); + this.profileNameSuffixGenerator = new AtomicLong(); } @Override @@ -62,6 +78,71 @@ protected synchronized void requestInitialContainers() { requestNewContainersForStaffingDeltas(deltas); } + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited depending on the exit status. + *

+ * A container completes in either of the following conditions: + *

    + *
  1. The container gets stopped by the ApplicationMaster.
  2. + *
  3. Some error happens in the container and caused the container to exit
  4. + *
  5. The container gets preempted by the ResourceManager
  6. + *
  7. The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory
  8. + *
+ * A replacement container is needed in all except the first case. + *

+ */ + @Override + protected void handleContainerCompletion(ContainerStatus containerStatus) { + ContainerId completedContainerId = containerStatus.getContainerId(); + ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); + + if (completedContainerInfo == null) { + log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", + completedContainerId); + this.removedContainerIds.add(completedContainerId); + return; + } + + log.info("Container {} running profile {} completed with exit status {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus() + ); + + if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { + log.info("Container {} running profile {} completed with diagnostics: {}", + completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics() + ); + } + + if (this.shutdownInProgress) { + log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId); + return; + } + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + + switch (containerStatus.getExitStatus()) { + case(ContainerExitStatus.ABORTED): + handleAbortedContainer(completedContainerId, completedContainerInfo); + break; + case(ContainerExitStatus.PREEMPTED): + log.info("Container {} for profile {} preempted, starting to launching a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(workerProfile, 1); + break; + case(137): // General OOM exit status + case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): + case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): + handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); + break; + case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed + log.info("Exit status 1.CompletedContainerInfo = {}", completedContainerInfo); + break; + default: + break; + } + } + /** * Revises the workforce plan and requests new containers based on the given scaling directives. * @@ -71,6 +152,23 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List iterator = removedContainerIds.iterator(); + while (iterator.hasNext()) { + ContainerId containerId = iterator.next(); + ContainerInfo containerInfo = this.containerMap.remove(containerId); + if (containerInfo != null) { + WorkerProfile workerProfile = containerInfo.getWorkerProfile(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, + System.currentTimeMillis()); + } + iterator.remove(); + } + } + this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); @@ -78,22 +176,64 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List { - if (profileDelta.getDelta() > 0) { // scale up! - WorkerProfile workerProfile = profileDelta.getProfile(); - String profileName = workerProfile.getName(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); - int delta = profileDelta.getDelta(); + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int delta = profileDelta.getDelta(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + if (delta > 0) { // scale up! log.info("Requesting {} new containers for profile {} having currently {} containers", delta, WorkforceProfiles.renderName(profileName), currNumContainers); requestContainersForWorkerProfile(workerProfile, delta); // update our staffing after requesting new containers this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); - } else if (profileDelta.getDelta() < 0) { // scale down! - // TODO: Decide how to handle negative deltas - log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", - profileDelta.getProfile().getName(), profileDelta.getDelta()); + } else if (delta < 0) { // scale down! + log.info("Releasing {} containers for profile {} having currently {} containers", -delta, + WorkforceProfiles.renderName(profileName), currNumContainers); + releaseContainersForWorkerProfile(profileName, delta); + // update our staffing after releasing containers + int numContainersAfterRelease = Math.max(currNumContainers + delta, 0); + this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis()); } // else, already at staffing plan (or at least have requested, so in-progress) }); } + private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + // Case 1 : Container release requested while scaling down + if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) { + log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName()); + this.releasedContainerCache.invalidate(completedContainerId); + return; + } + + // Case 2 : Container release was not requested, we need to request a replacement container + log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); + requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1); + } + + private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { + log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", + completedContainerId, completedContainerInfo.getWorkerProfileName()); + + WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + // Update the current staffing to reflect the container that exited with OOM + int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); + if (currNumContainers > 0) { + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + } + + // Request a replacement container + int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + int newContainerMemoryMbs = currContainerMemoryMbs * 2; //TODO: make it configurable or auto-tunable + Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), + new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) + )); + ScalingDirective scalingDirective = new ScalingDirective( + workerProfile.getName() + "-" + profileNameSuffixGenerator.getAndIncrement(), + 1, + System.currentTimeMillis(), + optProfileDerivation + ); + reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(scalingDirective)); + } + } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index ec4da215a63..95220aa99c8 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -81,7 +81,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; import com.google.common.io.Closer; import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; @@ -106,10 +105,8 @@ import org.apache.gobblin.yarn.GobblinYarnEventConstants; import org.apache.gobblin.yarn.GobblinYarnMetricTagNames; import org.apache.gobblin.yarn.YarnHelixUtils; -import org.apache.gobblin.yarn.event.ContainerReleaseRequest; -import org.apache.gobblin.yarn.event.ContainerShutdownRequest; -import org.apache.gobblin.yarn.event.NewContainerRequest; import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; /** * This class is responsible for all Yarn-related stuffs including ApplicationMaster registration, @@ -124,13 +121,9 @@ class YarnService extends AbstractIdleService { private static final Logger LOGGER = LoggerFactory.getLogger(YarnService.class); - private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN"; - private final String applicationName; private final String applicationId; private final String appViewAcl; - //Default helix instance tag derived from cluster level config - private final String helixInstanceTags; protected final Config config; private final EventBus eventBus; @@ -146,11 +139,7 @@ class YarnService extends AbstractIdleService { private final AMRMClientAsync amrmClientAsync; private final NMClientAsync nmClientAsync; private final ExecutorService containerLaunchExecutor; - private final int requestedContainerMemoryMbs; - private final int requestedContainerCores; private final boolean containerHostAffinityEnabled; - - private final int helixInstanceMaxRetries; private final String containerTimezone; private final String proxyJvmArgs; @@ -164,34 +153,18 @@ class YarnService extends AbstractIdleService { private final Object allContainersStopped = new Object(); - // A map from container IDs to Container instances, Helix participant IDs of the containers and Helix Tag - @VisibleForTesting - @Getter(AccessLevel.PROTECTED) - private final ConcurrentMap containerMap = Maps.newConcurrentMap(); + // A map from container IDs to Container instances, WorkerProfile Name and WorkerProfile Object + protected final ConcurrentMap containerMap = new ConcurrentHashMap<>(); // A cache of the containers with an outstanding container release request. // This is a cache instead of a set to get the automatic cleanup in case a container completes before the requested // release. - @VisibleForTesting - @Getter(AccessLevel.PROTECTED) - private final Cache releasedContainerCache; - - // A map from Helix instance names to the number times the instances are retried to be started - private final ConcurrentMap helixInstanceRetryCount = Maps.newConcurrentMap(); - - // A concurrent HashSet of unused Helix instance names. An unused Helix instance name gets put - // into the set if the container running the instance completes. Unused Helix - // instance names get picked up when replacement containers get allocated. - private final Set unusedHelixInstanceNames = ConcurrentHashMap.newKeySet(); - - // The map from helix tag to allocated container count - private final ConcurrentMap allocatedContainerCountMap = Maps.newConcurrentMap(); - private final ConcurrentMap removedContainerID = Maps.newConcurrentMap(); + protected final Cache releasedContainerCache; private final AtomicInteger priorityNumGenerator = new AtomicInteger(0); private final Map resourcePriorityMap = new HashMap<>(); - private volatile boolean shutdownInProgress = false; + protected volatile boolean shutdownInProgress = false; private final boolean jarCacheEnabled; private static final long DEFAULT_ALLOCATION_REQUEST_ID = 0L; @@ -226,14 +199,8 @@ public YarnService(Config config, String applicationName, String applicationId, this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler())); this.nmClientAsync.init(this.yarnConfiguration); - this.requestedContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - this.requestedContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY); this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED); - this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES); - this.helixInstanceTags = ConfigUtils.getString(config, - GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG); - this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; @@ -257,53 +224,10 @@ public YarnService(Config config, String applicationName, String applicationId, } - @SuppressWarnings("unused") - @Subscribe - public void handleNewContainerRequest(NewContainerRequest newContainerRequest) { - if (!this.maxResourceCapacity.isPresent()) { - LOGGER.error(String.format( - "Unable to handle new container request as maximum resource capacity is not available: " - + "[memory (MBs) requested = %d, vcores requested = %d]", this.requestedContainerMemoryMbs, - this.requestedContainerCores)); - return; - } - requestContainer(newContainerRequest.getReplacedContainer().transform(container -> container.getNodeId().getHost()), - newContainerRequest.getResource()); - } - protected NMClientCallbackHandler getNMClientCallbackHandler() { return new NMClientCallbackHandler(); } - @SuppressWarnings("unused") - @Subscribe - public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) { - for (Container container : containerShutdownRequest.getContainers()) { - LOGGER.info(String.format("Stopping container %s running on %s", container.getId(), container.getNodeId())); - this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId()); - } - } - - /** - * Request the Resource Manager to release the container - * @param containerReleaseRequest containers to release - */ - @Subscribe - public void handleContainerReleaseRequest(ContainerReleaseRequest containerReleaseRequest) { - for (Container container : containerReleaseRequest.getContainers()) { - LOGGER.info(String.format("Releasing container %s running on %s", container.getId(), container.getNodeId())); - - // Record that this container was explicitly released so that a new one is not spawned to replace it - // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion() - // can check for the container id and skip spawning a replacement container. - // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently - // with the release call. So in some cases a replacement container may have already been spawned before - // the container is put into the black list. - this.releasedContainerCache.put(container.getId(), ""); - this.amrmClientAsync.releaseAssignedContainer(container.getId()); - } - } - @Override protected synchronized void startUp() throws Exception { LOGGER.info("Starting the TemporalYarnService"); @@ -335,8 +259,8 @@ protected void shutDown() throws IOException { // Stop the running containers for (ContainerInfo containerInfo : this.containerMap.values()) { - LOGGER.info("Stopping container {} running participant {}", containerInfo.getContainer().getId(), - containerInfo.getHelixParticipantId()); + LOGGER.info("Stopping container {} running worker profile {}", containerInfo.getContainer().getId(), + containerInfo.getWorkerProfileName()); this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), containerInfo.getContainer().getNodeId()); } @@ -411,10 +335,24 @@ protected synchronized void requestContainersForWorkerProfile(WorkerProfile work requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, containerCores), Optional.of(allocationRequestId)); } - private void requestContainer(Optional preferredNode, Optional resourceOptional) { - Resource desiredResource = resourceOptional.or(Resource.newInstance( - this.requestedContainerMemoryMbs, this.requestedContainerCores)); - requestContainer(preferredNode, desiredResource, Optional.absent()); + protected synchronized void releaseContainersForWorkerProfile(String profileName, int numContainers) { + Iterator> containerMapIterator = this.containerMap.entrySet().iterator(); + while (containerMapIterator.hasNext() && numContainers > 0) { + Map.Entry entry = containerMapIterator.next(); + if (entry.getValue().getWorkerProfile().getName().equals(profileName)) { + ContainerId containerId = entry.getKey(); + LOGGER.info("Releasing container {} running profile {}", containerId, WorkforceProfiles.renderName(profileName)); + // Record that this container was explicitly released so that a new one is not spawned to replace it + // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion() + // can check for the container id and skip spawning a replacement container. + // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently + // with the release call. So in some cases a replacement container may have already been spawned before + // the container is put into the black list. + this.releasedContainerCache.put(containerId, ""); + this.amrmClientAsync.releaseAssignedContainer(containerId); + numContainers--; + } + } } /** @@ -424,7 +362,7 @@ private void requestContainer(Optional preferredNode, Optional * @param resource */ protected void requestContainers(int numContainers, Resource resource, Optional optAllocationRequestId) { - LOGGER.info("Requesting {} containers with resource={} and allocation request id = {}", numContainers, resource, optAllocationRequestId); + LOGGER.info("Requesting {} containers with resource = {} and allocation request id = {}", numContainers, resource, optAllocationRequestId); IntStream.range(0, numContainers) .forEach(i -> requestContainer(Optional.absent(), resource, optAllocationRequestId)); } @@ -553,16 +491,7 @@ protected ByteBuffer getSecurityTokens() throws IOException { } @VisibleForTesting - protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) { - long allocationRequestId = container.getAllocationRequestId(); - WorkerProfile workerProfile = Optional.fromNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId)) - .or(() -> { - LOGGER.warn("No Worker Profile found for {}, so falling back to default", allocationRequestId); - return this.workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID, k -> { - LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker Profile even yet mapped to the default allocation request ID {} - creating one now", DEFAULT_ALLOCATION_REQUEST_ID); - return new WorkerProfile(this.config); - }); - }); + protected String buildContainerCommand(Container container, String workerProfileName, WorkerProfile workerProfile) { Config workerProfileConfig = workerProfile.getConfig(); double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig, @@ -574,13 +503,13 @@ protected String buildContainerCommand(Container container, String helixParticip GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); Preconditions.checkArgument(workerJvmMemoryXmxRatio >= 0 && workerJvmMemoryXmxRatio <= 1, - workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + + workerProfileName + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); long containerMemoryMbs = container.getResource().getMemorySize(); Preconditions.checkArgument(workerJvmMemoryOverheadMbs < containerMemoryMbs * workerJvmMemoryXmxRatio, - workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + + workerProfileName + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); @@ -602,14 +531,8 @@ protected String buildContainerCommand(Container container, String helixParticip .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) .append(" ").append(this.applicationName) .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME) - .append(" ").append(this.applicationId) - .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) - .append(" ").append(helixParticipantId); + .append(" ").append(this.applicationId); - if (!Strings.isNullOrEmpty(helixInstanceTag)) { - containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME) - .append(" ").append(helixInstanceTag); - } return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( containerProcessName).append(".").append(ApplicationConstants.STDOUT) .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( @@ -637,103 +560,11 @@ private boolean shouldStickToTheSameNode(int containerExitStatus) { } /** - * Handle the completion of a container. A new container will be requested to replace the one - * that just exited. Depending on the exit status and if container host affinity is enabled, - * the new container may or may not try to be started on the same node. - *

- * A container completes in either of the following conditions: 1) some error happens in the - * container and caused the container to exit, 2) the container gets killed due to some reason, - * for example, if it runs over the allowed amount of virtual or physical memory, 3) the gets - * preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster. - * A replacement container is needed in all but the last case. + * Handle the completion of a container. + * Just removes the containerId from {@link #containerMap} */ protected void handleContainerCompletion(ContainerStatus containerStatus) { - ContainerInfo completedContainerInfo = this.containerMap.remove(containerStatus.getContainerId()); - //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might - //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the - //containerId missing from the containersMap. - // We use removedContainerID to remember these containers and remove them from containerMap later when we call requestTargetNumberOfContainers method - if (completedContainerInfo == null) { - removedContainerID.putIfAbsent(containerStatus.getContainerId(), ""); - } - String completedInstanceName = UNKNOWN_HELIX_INSTANCE; - - String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag(); - if (completedContainerInfo != null) { - allocatedContainerCountMap.get(helixTag).decrementAndGet(); - } - - LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d", - containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus())); - - if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) { - LOGGER.info(String.format("Received the following diagnostics information for container %s: %s", - containerStatus.getContainerId(), containerStatus.getDiagnostics())); - } - - switch(containerStatus.getExitStatus()) { - case(ContainerExitStatus.ABORTED): - if (handleAbortedContainer(containerStatus, completedContainerInfo, completedInstanceName)) { - return; - } - break; - case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed - LOGGER.info("Exit status 1. CompletedContainerInfo={}", completedContainerInfo); - break; - default: - break; - } - - if (this.shutdownInProgress) { - return; - } - if(completedContainerInfo != null) { - this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0)); - int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet(); - - // Populate event metadata - Optional> eventMetadataBuilder = Optional.absent(); - if (this.eventSubmitter.isPresent()) { - eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus)); - eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName); - eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + ""); - } - - if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) { - if (this.eventSubmitter.isPresent()) { - this.eventSubmitter.get() - .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build()); - } - - LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName); - return; - } - - // Add the Helix instance name of the completed container to the set of unused - // instance names so they can be reused by a replacement container. - LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName); - this.unusedHelixInstanceNames.add(completedInstanceName); - - /** - * NOTE: logic for handling container failure is removed because {@link #YarnService} relies on the auto scaling manager - * to control the number of containers by polling helix for the current number of tasks - * Without that integration, that code requests too many containers when there are exceptions and overloads yarn - */ - } - } - - private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo, - String completedInstanceName) { - if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) { - LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId()); - if (completedContainerInfo != null) { - LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName); - this.unusedHelixInstanceNames.add(completedInstanceName); - } - return true; - } - LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId()); - return false; + this.containerMap.remove(containerStatus.getContainerId()); } private ImmutableMap.Builder buildContainerStatusEventMetadata(ContainerStatus containerStatus) { @@ -782,33 +613,28 @@ public void onContainersCompleted(List statuses) { @Override public void onContainersAllocated(List containers) { for (final Container container : containers) { + long allocationRequestId = container.getAllocationRequestId(); + WorkerProfile workerProfile = Optional.fromNullable(workerProfileByAllocationRequestId.get(allocationRequestId)) + .or(() -> { + LOGGER.warn("No Worker Profile found for {}, so falling back to default", allocationRequestId); + return workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID, k -> { + LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker Profile even yet mapped to the default allocation request ID {} - creating one now", DEFAULT_ALLOCATION_REQUEST_ID); + return new WorkerProfile(config); + }); + }); + String containerId = container.getId().toString(); - String containerHelixTag = helixInstanceTags; if (eventSubmitter.isPresent()) { eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION, GobblinYarnMetricTagNames.CONTAINER_ID, containerId); } - LOGGER.info("Container {} has been allocated with resource {} for helix tag {}", - container.getId(), container.getResource(), containerHelixTag); - - //Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live. - //Once we find a candidate instance, it is removed from the set. - String instanceName = null; - - //Ensure that updates to unusedHelixInstanceNames are visible to other threads that might concurrently - //invoke the callback on container allocation. - synchronized (this) { - Iterator iterator = unusedHelixInstanceNames.iterator(); - while (iterator.hasNext()) { - instanceName = iterator.next(); - } - } + LOGGER.info("Container {} has been allocated with resource {} for Worker Profile {}", + container.getId(), container.getResource(), WorkforceProfiles.renderName(workerProfile.getName())); - ContainerInfo containerInfo = new ContainerInfo(container, instanceName, containerHelixTag); + ContainerInfo containerInfo = new ContainerInfo(container, + WorkforceProfiles.renderName(workerProfile.getName()), workerProfile); containerMap.put(container.getId(), containerInfo); - allocatedContainerCountMap.putIfAbsent(containerHelixTag, new AtomicInteger(0)); - allocatedContainerCountMap.get(containerHelixTag).incrementAndGet(); // Find matching requests and remove the request (YARN-660). We the scheduler are responsible // for cleaning up requests after allocation based on the design in the described ticket. @@ -969,26 +795,26 @@ public void onStopContainerError(ContainerId containerId, Throwable t) { } } - // Class encapsulates Container instances, Helix participant IDs of the containers, Helix Tag, and + // Class encapsulates Container instance, WorkerProfile name to print, WorkerProfile, and // initial startup command @Getter class ContainerInfo { private final Container container; - private final String helixParticipantId; - private final String helixTag; + private final String workerProfileName; // Storing this to avoid calling WorkforceProfiles.renderName(workerProfile.getName()) while logging + private final WorkerProfile workerProfile; private final String startupCommand; - public ContainerInfo(Container container, String helixParticipantId, String helixTag) { + public ContainerInfo(Container container, String workerProfileName, WorkerProfile workerProfile) { this.container = container; - this.helixParticipantId = helixParticipantId; - this.helixTag = helixTag; - this.startupCommand = YarnService.this.buildContainerCommand(container, helixParticipantId, helixTag); + this.workerProfileName = workerProfileName; + this.workerProfile = workerProfile; + this.startupCommand = YarnService.this.buildContainerCommand(container, workerProfileName, workerProfile); } @Override public String toString() { - return String.format("ContainerInfo{ container=%s, helixParticipantId=%s, helixTag=%s, startupCommand=%s }", - container.getId(), helixParticipantId, helixTag, startupCommand); + return String.format("ContainerInfo{ container=%s, workerProfileName=%s, startupCommand=%s }", + container.getId(), workerProfileName, startupCommand); } } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index 3c81316b85c..8d216450a34 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -20,8 +20,13 @@ import java.io.IOException; import java.net.URL; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -103,12 +108,6 @@ public void testBuildContainerCommand() throws Exception { .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio)) .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs)); - Resource resource = Resource.newInstance(resourceMemoryMB, 2); - - Container mockContainer = Mockito.mock(Container.class); - Mockito.when(mockContainer.getResource()).thenReturn(resource); - Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L); - YarnService yarnService = new YarnService( config, "testApplicationName", @@ -118,9 +117,13 @@ public void testBuildContainerCommand() throws Exception { eventBus ); - yarnService.startUp(); - - String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag"); + WorkerProfile workerProfile = new WorkerProfile(config); + ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), + 0), 0); + Resource resource = Resource.newInstance(resourceMemoryMB, 2); + Container container = Container.newInstance(containerId, null, null, resource, null, null); + YarnService.ContainerInfo containerInfo = yarnService.new ContainerInfo(container, WorkforceProfiles.BASELINE_NAME_RENDERING, workerProfile); + String command = containerInfo.getStartupCommand(); Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M")); } }