Skip to content

Commit ee4d5b8

Browse files
1996fanruigyfora
authored andcommitted
[FLINK-30214][mini_cluster] MiniCluster adapt the jobvertex-parallelism-overrides
1 parent d2cf55c commit ee4d5b8

File tree

2 files changed

+85
-3
lines changed

2 files changed

+85
-3
lines changed

flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.configuration.ConfigConstants;
2222
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.PipelineOptions;
2324
import org.apache.flink.configuration.RestOptions;
2425
import org.apache.flink.configuration.TaskManagerOptions;
2526
import org.apache.flink.core.execution.JobClient;
@@ -34,6 +35,7 @@
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

38+
import java.util.Map;
3739
import java.util.concurrent.CompletableFuture;
3840
import java.util.function.Function;
3941

@@ -127,14 +129,24 @@ private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) {
127129
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
128130
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
129131

132+
Map<String, String> overwriteParallelisms =
133+
configuration.get(PipelineOptions.PARALLELISM_OVERRIDES);
134+
if (overwriteParallelisms != null) {
135+
for (String overrideParallelism : overwriteParallelisms.values()) {
136+
maximumParallelism =
137+
Math.max(maximumParallelism, Integer.parseInt(overrideParallelism));
138+
}
139+
}
140+
141+
int finalMaximumParallelism = maximumParallelism;
130142
int numSlotsPerTaskManager =
131143
configuration
132144
.getOptional(TaskManagerOptions.NUM_TASK_SLOTS)
133145
.orElseGet(
134146
() ->
135-
maximumParallelism > 0
147+
finalMaximumParallelism > 0
136148
? MathUtils.divideRoundUp(
137-
maximumParallelism, numTaskManagers)
149+
finalMaximumParallelism, numTaskManagers)
138150
: TaskManagerOptions.NUM_TASK_SLOTS.defaultValue());
139151

140152
return new MiniClusterConfiguration.Builder()

flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java

+71-1
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,28 @@
2121
import org.apache.flink.api.common.JobExecutionResult;
2222
import org.apache.flink.api.common.JobStatus;
2323
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.configuration.PipelineOptions;
2425
import org.apache.flink.core.execution.JobClient;
2526
import org.apache.flink.core.execution.SavepointFormatType;
27+
import org.apache.flink.runtime.execution.Environment;
2628
import org.apache.flink.runtime.jobgraph.JobGraph;
2729
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
2830
import org.apache.flink.runtime.jobgraph.JobVertex;
2931
import org.apache.flink.runtime.minicluster.MiniCluster;
32+
import org.apache.flink.runtime.testutils.CancelableInvokable;
3033
import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
3134

35+
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
36+
3237
import org.junit.jupiter.api.AfterEach;
3338
import org.junit.jupiter.api.Test;
3439

3540
import java.time.Duration;
3641
import java.util.Map;
42+
import java.util.concurrent.CountDownLatch;
3743
import java.util.concurrent.ExecutionException;
3844

45+
import static org.apache.flink.util.Preconditions.checkState;
3946
import static org.assertj.core.api.Assertions.assertThat;
4047
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4148

@@ -152,9 +159,44 @@ void testJobClientInteractionAfterShutdown() throws Exception {
152159
"MiniCluster is not yet running or has already been shut down.");
153160
}
154161

162+
@Test
163+
void testTurnUpParallelismByOverwriteParallelism() throws Exception {
164+
JobVertex jobVertex = getBlockingJobVertex();
165+
JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
166+
int overwriteParallelism = jobVertex.getParallelism() + 1;
167+
BlockingInvokable.reset(overwriteParallelism);
168+
169+
Configuration configuration = new Configuration();
170+
configuration.set(
171+
PipelineOptions.PARALLELISM_OVERRIDES,
172+
ImmutableMap.of(
173+
jobVertex.getID().toHexString(), String.valueOf(overwriteParallelism)));
174+
175+
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster(configuration);
176+
JobClient jobClient =
177+
perJobMiniClusterFactory
178+
.submitJob(jobGraph, ClassLoader.getSystemClassLoader())
179+
.get();
180+
181+
// wait for tasks to be properly running
182+
BlockingInvokable.latch.await();
183+
184+
jobClient.cancel().get();
185+
assertThat(jobClient.getJobExecutionResult())
186+
.failsWithin(Duration.ofSeconds(1))
187+
.withThrowableOfType(ExecutionException.class)
188+
.withMessageContaining("Job was cancelled");
189+
190+
assertThatMiniClusterIsShutdown();
191+
}
192+
155193
private PerJobMiniClusterFactory initializeMiniCluster() {
194+
return initializeMiniCluster(new Configuration());
195+
}
196+
197+
private PerJobMiniClusterFactory initializeMiniCluster(Configuration configuration) {
156198
return PerJobMiniClusterFactory.createWithFactory(
157-
new Configuration(),
199+
configuration,
158200
config -> {
159201
miniCluster = new MiniCluster(config);
160202
return miniCluster;
@@ -175,4 +217,32 @@ private static JobGraph getCancellableJobGraph() {
175217
jobVertex.setParallelism(1);
176218
return JobGraphTestUtils.streamingJobGraph(jobVertex);
177219
}
220+
221+
private static JobVertex getBlockingJobVertex() {
222+
JobVertex jobVertex = new JobVertex("jobVertex");
223+
jobVertex.setInvokableClass(BlockingInvokable.class);
224+
jobVertex.setParallelism(2);
225+
return jobVertex;
226+
}
227+
228+
/** Test invokable that allows waiting for all subtasks to be running. */
229+
public static class BlockingInvokable extends CancelableInvokable {
230+
231+
private static CountDownLatch latch;
232+
233+
public BlockingInvokable(Environment environment) {
234+
super(environment);
235+
}
236+
237+
@Override
238+
public void doInvoke() throws Exception {
239+
checkState(latch != null, "The invokable should be reset first.");
240+
latch.countDown();
241+
waitUntilCancelled();
242+
}
243+
244+
public static void reset(int count) {
245+
latch = new CountDownLatch(count);
246+
}
247+
}
178248
}

0 commit comments

Comments
 (0)