diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java index ca6aa72064..8d92b0edae 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java @@ -110,9 +110,13 @@ public void run() { List scalingDirectives = scalingDirectiveSource.getScalingDirectives(); if (CollectionUtils.isNotEmpty(scalingDirectives)) { dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); + } else { + dynamicScalingYarnService.calcDeltasAndRequestContainers(); } } catch (FileNotFoundException fnfe) { - log.warn("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace + log.debug("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace + // FNFE comes when scaling directives path is not yet created, so we should just calc delta & request containers if needed + dynamicScalingYarnService.calcDeltasAndRequestContainers(); } catch (IOException e) { log.error("Failed to get scaling directives", e); } catch (Throwable t) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 579c53106d..1ba66d674f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -64,7 +64,7 @@ public class DynamicScalingYarnService extends YarnService { private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; - private final Queue removedContainerIds; + protected final Queue removedContainerIds; private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, @@ -164,7 +164,11 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List iterator = removedContainerIds.iterator(); while (iterator.hasNext()) { @@ -176,15 +180,10 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List 0) { this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()); - // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, - // otherwise extra containers will be requested when calculating deltas - scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis())); } iterator.remove(); } } - - this.workforcePlan.reviseWhenNewer(scalingDirectives); StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java index 8b0602f035..666e3c54c4 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.yarn; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +65,19 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup Thread.sleep(3000); testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers(); + } + + @Test + public void testWhenScalingDirectivesThrowsFNFE() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenThrow(FileNotFoundException.class); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(2000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers(); } /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @@ -77,6 +91,7 @@ public void testWithDummyScalingDirectiveSource() throws IOException, Interrupte Thread.sleep(7000); // 7 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers(); } @Test @@ -95,6 +110,7 @@ public void testWithRandomScalingDirectives() throws IOException, InterruptedExc Thread.sleep(5000); testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers(); } /** Test implementation of {@link AbstractDynamicScalingYarnServiceManager} which returns passed diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java index 9423e8998e..9556bccd1b 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -263,6 +263,21 @@ public void testHandleContainerCompletionForExitStatusWhichDoesNotRequestReplace Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); } + @Test + public void testContainerRequestedWhenCompletionCalledBeforeAllocated() { + ContainerId containerId = generateRandomContainerId(); + DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); + dynamicScalingYarnServiceSpy.removedContainerIds.add(containerId); + dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); + dynamicScalingYarnServiceSpy.calcDeltasAndRequestContainers(); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); + ArgumentCaptor resourceCaptor = ArgumentCaptor.forClass(Resource.class); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); + Resource capturedResource = resourceCaptor.getValue(); + Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); + Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + } + private ContainerId generateRandomContainerId() { return ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),