Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Blazer-007 committed Feb 17, 2025
1 parent 861916a commit f341c1e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ public void run() {
List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives();
if (CollectionUtils.isNotEmpty(scalingDirectives)) {
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
} else {
dynamicScalingYarnService.calcDeltasAndRequestContainers();
}
} catch (FileNotFoundException fnfe) {
log.warn("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace
log.debug("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace
// FNFE comes when scaling directives path is not yet created, so we should just calc delta & request containers if needed
dynamicScalingYarnService.calcDeltasAndRequestContainers();
} catch (IOException e) {
log.error("Failed to get scaling directives", e);
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class DynamicScalingYarnService extends YarnService {
private final WorkforceStaffing actualWorkforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;
private final Queue<ContainerId> removedContainerIds;
protected final Queue<ContainerId> removedContainerIds;
private final AtomicLong profileNameSuffixGenerator;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
Expand Down Expand Up @@ -164,7 +164,11 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}
this.workforcePlan.reviseWhenNewer(scalingDirectives);
calcDeltasAndRequestContainers();
}

public synchronized void calcDeltasAndRequestContainers() {
// Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated()
Iterator<ContainerId> iterator = removedContainerIds.iterator();
while (iterator.hasNext()) {
Expand All @@ -176,15 +180,10 @@ public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<Scaling
if (currNumContainers > 0) {
this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1,
System.currentTimeMillis());
// Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile,
// otherwise extra containers will be requested when calculating deltas
scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, System.currentTimeMillis()));
}
iterator.remove();
}
}

this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.temporal.yarn;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -64,6 +65,19 @@ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, Interrup
Thread.sleep(3000);
testDynamicScalingYarnServiceManager.shutDown();
Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers();
}

@Test
public void testWhenScalingDirectivesThrowsFNFE() throws IOException, InterruptedException {
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenThrow(FileNotFoundException.class);
TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager(
mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
testDynamicScalingYarnServiceManager.startUp();
Thread.sleep(2000);
testDynamicScalingYarnServiceManager.shutDown();
Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers();
}

/** Note : this test uses {@link DummyScalingDirectiveSource}*/
Expand All @@ -77,6 +91,7 @@ public void testWithDummyScalingDirectiveSource() throws IOException, Interrupte
Thread.sleep(7000); // 7 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times
testDynamicScalingYarnServiceManager.shutDown();
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers();
}

@Test
Expand All @@ -95,6 +110,7 @@ public void testWithRandomScalingDirectives() throws IOException, InterruptedExc
Thread.sleep(5000);
testDynamicScalingYarnServiceManager.shutDown();
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers();
}

/** Test implementation of {@link AbstractDynamicScalingYarnServiceManager} which returns passed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,21 @@ public void testHandleContainerCompletionForExitStatusWhichDoesNotRequestReplace
Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class));
}

@Test
public void testContainerRequestedWhenCompletionCalledBeforeAllocated() {
ContainerId containerId = generateRandomContainerId();
DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId);
dynamicScalingYarnServiceSpy.removedContainerIds.add(containerId);
dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo);
dynamicScalingYarnServiceSpy.calcDeltasAndRequestContainers();
Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt());
ArgumentCaptor<Resource> resourceCaptor = ArgumentCaptor.forClass(Resource.class);
Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class));
Resource capturedResource = resourceCaptor.getValue();
Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs);
Assert.assertEquals(capturedResource.getVirtualCores(), initCores);
}


private ContainerId generateRandomContainerId() {
return ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0),
Expand Down

0 comments on commit f341c1e

Please sign in to comment.