diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 4d660fad52..579c53106d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -21,8 +21,8 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.collections.CollectionUtils; @@ -56,15 +56,15 @@ public class DynamicScalingYarnService extends YarnService { private static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1; - private static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; - private static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; + protected static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; + protected static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; - private final Set removedContainerIds; + private final Queue removedContainerIds; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -73,7 +73,7 @@ public DynamicScalingYarnService(Config config, String applicationName, String a this.actualWorkforceStaffing = WorkforceStaffing.initialize(0); this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY)); - this.removedContainerIds = ConcurrentHashMap.newKeySet(); + this.removedContainerIds = new ConcurrentLinkedQueue<>(); this.profileNameSuffixGenerator = new AtomicLong(); } @@ -232,32 +232,32 @@ private synchronized void handleContainerExitedWithOOM(ContainerId completedCont List scalingDirectives = new ArrayList<>(); WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); + long currTimeMillis = System.currentTimeMillis(); // Update the current staffing to reflect the container that exited with OOM int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); if (currNumContainers > 0) { - this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); + this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 1); // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, // otherwise extra containers will be requested when calculating deltas - scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); + scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis + 2)); } // Request a replacement container int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - int newContainerMemoryMbs = currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER; - if (currContainerMemoryMbs < MAX_REPLACEMENT_CONTAINER_MEMORY_MBS && newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { - newContainerMemoryMbs = MAX_REPLACEMENT_CONTAINER_MEMORY_MBS; - } else if (newContainerMemoryMbs > MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { - log.warn("Expected replacement container memory exceeds the maximum allowed memory {}. Not requesting a replacement container.", - MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); + if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { + log.warn("Container {} already had max allowed memory {} MBs. Not requesting a replacement container.", + completedContainerId, currContainerMemoryMbs); return; } + int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER, + MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); Optional optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) )); scalingDirectives.add(new ScalingDirective( DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(), 1, - System.currentTimeMillis(), + currTimeMillis + 3, optProfileDerivation )); reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java index 6c0946aabb..ef10006ea8 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -17,38 +17,109 @@ package org.apache.gobblin.temporal.yarn; -import java.net.URL; import java.util.Collections; +import java.util.List; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.google.common.base.Optional; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; + /** Tests for {@link DynamicScalingYarnService} */ public class DynamicScalingYarnServiceTest { private Config defaultConfigs; + private final int initNumContainers = 1; + private final int initMemoryMbs = 1024; + private final int initCores = 1; + private final Resource initResource = Resource.newInstance(initMemoryMbs, initCores); private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); private final EventBus eventBus = new EventBus("TemporalDynamicScalingYarnServiceTest"); + private AMRMClientAsync mockAMRMClient; + private RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse; + private WorkerProfile testBaselineworkerProfile; + private DynamicScalingYarnService dynamicScalingYarnServiceSpy; @BeforeClass - public void setup() { - URL url = DynamicScalingYarnServiceTest.class.getClassLoader() - .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); // using same initial config as of YarnServiceTest - Assert.assertNotNull(url, "Could not find resource " + url); - this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); + public void setup() throws Exception { + this.defaultConfigs = ConfigFactory.empty() + .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, ConfigValueFactory.fromAnyRef(initCores)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, ConfigValueFactory.fromAnyRef(initMemoryMbs)) + .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, ConfigValueFactory.fromAnyRef(initNumContainers)); + + this.testBaselineworkerProfile = new WorkerProfile(this.defaultConfigs); + + mockAMRMClient = Mockito.mock(AMRMClientAsync.class); + mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class); + + MockedStatic amrmClientAsyncMockStatic = Mockito.mockStatic(AMRMClientAsync.class); + + amrmClientAsyncMockStatic.when(() -> AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class))) + .thenReturn(mockAMRMClient); + Mockito.doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class)); + + Mockito.when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString())) + .thenReturn(mockRegisterApplicationMasterResponse); + Mockito.when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability()) + .thenReturn(Mockito.mock(Resource.class)); + } + + @BeforeMethod + public void setupMethod() throws Exception { + DynamicScalingYarnService dynamicScalingYarnService = new DynamicScalingYarnService(this.defaultConfigs, "testApp", "testAppId", yarnConfiguration, mockFileSystem, eventBus); + dynamicScalingYarnServiceSpy = Mockito.spy(dynamicScalingYarnService); + Mockito.doNothing().when(dynamicScalingYarnServiceSpy).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); + dynamicScalingYarnServiceSpy.containerMap.clear(); + } + + @AfterMethod + public void cleanupMethod() { + dynamicScalingYarnServiceSpy.containerMap.clear(); + Mockito.reset(dynamicScalingYarnServiceSpy); + } + + @Test + public void testDynamicScalingYarnServiceStartupWithInitialContainers() throws Exception { + dynamicScalingYarnServiceSpy.startUp(); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(initNumContainers), resourceCaptor.capture(), Mockito.any(Optional.class)); + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); } @Test @@ -61,4 +132,127 @@ public void testReviseWorkforcePlanAndRequestNewContainers() throws Exception { dynamicScalingYarnServiceSpy.reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(baseScalingDirective)); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(numNewContainers), Mockito.any(Resource.class), Mockito.any(Optional.class)); } + + @DataProvider(name = "OOMExitStatusProvider") + public Object[][] OOMExitStatusProvider() { + return new Object[][] { + {ContainerExitStatus.KILLED_EXCEEDED_PMEM}, + {ContainerExitStatus.KILLED_EXCEEDED_VMEM}, + {DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE} + }; + } + + @DataProvider(name = "NonOOMExitStatusProviderWhichRequestReplacementContainer") + public Object[][] NonOOMExitStatusProviderWhichRequestReplacementContainer() { + return new Object[][] { + {ContainerExitStatus.ABORTED}, + {ContainerExitStatus.PREEMPTED} + }; + } + + @Test(dataProvider = "OOMExitStatusProvider") + public void testHandleContainerCompletionForStatusOOM(int containerExitStatusCode) throws Exception { + ContainerId containerId = generateRandomContainerId(); + DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); + ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); + Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); + + dynamicScalingYarnServiceSpy.startUp(); + dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated + + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); + + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs * DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + + @Test(dataProvider = "NonOOMExitStatusProviderWhichRequestReplacementContainer") + public void testHandleContainerCompletionForNonOOMStatusWhichRequestReplacementContainer(int containerExitStatusCode) throws Exception { + ContainerId containerId = generateRandomContainerId(); + DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); + ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); + Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); + + dynamicScalingYarnServiceSpy.startUp(); + dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated + + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + + @Test + public void testHandleContainerCompletionForAllOOMStatus() throws Exception { + ContainerId containerId1 = generateRandomContainerId(); + ContainerId containerId2 = generateRandomContainerId(); + ContainerId containerId3 = generateRandomContainerId(); + + DynamicScalingYarnService.ContainerInfo containerInfo1 = createBaselineContainerInfo(containerId1); + DynamicScalingYarnService.ContainerInfo containerInfo2 = createBaselineContainerInfo(containerId2); + DynamicScalingYarnService.ContainerInfo containerInfo3 = createBaselineContainerInfo(containerId3); + + ContainerStatus containerStatus1 = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus1.getContainerId()).thenReturn(containerId1); + Mockito.when(containerStatus1.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_VMEM); + + ContainerStatus containerStatus2 = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus2.getContainerId()).thenReturn(containerId2); + Mockito.when(containerStatus2.getExitStatus()).thenReturn(DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE); + + ContainerStatus containerStatus3 = Mockito.mock(ContainerStatus.class); + Mockito.when(containerStatus3.getContainerId()).thenReturn(containerId3); + Mockito.when(containerStatus3.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_PMEM); + + dynamicScalingYarnServiceSpy.startUp(); + // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated + dynamicScalingYarnServiceSpy.containerMap.put(containerId1, containerInfo1); + dynamicScalingYarnServiceSpy.containerMap.put(containerId2, containerInfo2); + dynamicScalingYarnServiceSpy.containerMap.put(containerId3, containerInfo3); + + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus1); + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus2); + dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus3); + + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(4)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(4)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + + List capturedResources = resourceCaptor.getAllValues(); + Assert.assertEquals(capturedResources.size(), 4); + + Resource capturedResource = capturedResources.get(0); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + + for (int idx = 1 ; idx < 4 ; idx++) { + capturedResource = capturedResources.get(idx); + Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs * DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + } + + private ContainerId generateRandomContainerId() { + return ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), + 0), (long) (Math.random() * 1000)); + } + + private DynamicScalingYarnService.ContainerInfo createBaselineContainerInfo(ContainerId containerId) { + Container container = Container.newInstance(containerId, null, null, initResource, null, null); + return dynamicScalingYarnServiceSpy.new ContainerInfo(container, WorkforceProfiles.BASELINE_NAME_RENDERING, testBaselineworkerProfile); + } }