Skip to content

Commit d550b46

Browse files
authored
[HUDI-8070] Support Flink 1.19 (apache#11779)
1 parent e1f70fd commit d550b46

File tree

48 files changed

+4867
-32
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+4867
-32
lines changed

.github/workflows/bot.yml

+12-7
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ jobs:
173173
include:
174174
- scalaProfile: "scala-2.12"
175175
sparkProfile: "spark3.5"
176-
flinkProfile: "flink1.18"
176+
flinkProfile: "flink1.19"
177177

178178
steps:
179179
- uses: actions/checkout@v3
@@ -420,6 +420,7 @@ jobs:
420420
- flinkProfile: "flink1.16"
421421
- flinkProfile: "flink1.17"
422422
- flinkProfile: "flink1.18"
423+
- flinkProfile: "flink1.19"
423424
steps:
424425
- uses: actions/checkout@v3
425426
- name: Set up JDK 8
@@ -456,15 +457,15 @@ jobs:
456457
matrix:
457458
include:
458459
- scalaProfile: 'scala-2.13'
459-
flinkProfile: 'flink1.18'
460+
flinkProfile: 'flink1.19'
460461
sparkProfile: 'spark3.5'
461462
sparkRuntime: 'spark3.5.0'
462463
- scalaProfile: 'scala-2.12'
463-
flinkProfile: 'flink1.18'
464+
flinkProfile: 'flink1.19'
464465
sparkProfile: 'spark3.5'
465466
sparkRuntime: 'spark3.5.0'
466467
- scalaProfile: 'scala-2.12'
467-
flinkProfile: 'flink1.18'
468+
flinkProfile: 'flink1.19'
468469
sparkProfile: 'spark3.4'
469470
sparkRuntime: 'spark3.4.0'
470471

@@ -493,6 +494,10 @@ jobs:
493494
strategy:
494495
matrix:
495496
include:
497+
- scalaProfile: 'scala-2.13'
498+
flinkProfile: 'flink1.19'
499+
sparkProfile: 'spark3.5'
500+
sparkRuntime: 'spark3.5.1'
496501
- scalaProfile: 'scala-2.13'
497502
flinkProfile: 'flink1.18'
498503
sparkProfile: 'spark3.5'
@@ -570,11 +575,11 @@ jobs:
570575
matrix:
571576
include:
572577
- scalaProfile: 'scala-2.13'
573-
flinkProfile: 'flink1.18'
578+
flinkProfile: 'flink1.19'
574579
sparkProfile: 'spark3.5'
575580
sparkRuntime: 'spark3.5.0'
576581
- scalaProfile: 'scala-2.12'
577-
flinkProfile: 'flink1.18'
582+
flinkProfile: 'flink1.19'
578583
sparkProfile: 'spark3.5'
579584
sparkRuntime: 'spark3.5.0'
580585
steps:
@@ -713,7 +718,7 @@ jobs:
713718
matrix:
714719
include:
715720
- scalaProfile: "scala-2.12"
716-
flinkProfile: "flink1.18"
721+
flinkProfile: "flink1.19"
717722

718723
steps:
719724
- uses: actions/checkout@v3

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.sink.utils;
2020

21+
import org.apache.hudi.adapter.CollectOutputAdapter;
2122
import org.apache.hudi.adapter.TestStreamConfigs;
2223
import org.apache.hudi.configuration.FlinkOptions;
2324
import org.apache.hudi.configuration.OptionsResolver;
@@ -79,7 +80,7 @@ public class BulkInsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
7980
private MapFunction<RowData, RowData> mapFunction;
8081
private Map<String, String> bucketIdToFileId;
8182
private SortOperator sortOperator;
82-
private CollectorOutput<RowData> output;
83+
private CollectOutputAdapter<RowData> output;
8384

8485
public BulkInsertFunctionWrapper(String tablePath, Configuration conf) throws Exception {
8586
ioManager = new IOManagerAsync();
@@ -227,7 +228,7 @@ private void setupSortOperator() throws Exception {
227228
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
228229
this.sortOperator = (SortOperator) sortOperatorGen.createSortOperator(conf);
229230
this.sortOperator.setProcessingTimeService(new TestProcessingTimeService());
230-
this.output = new CollectorOutput<>();
231+
this.output = new CollectOutputAdapter<>();
231232
StreamConfig streamConfig = new StreamConfig(conf);
232233
streamConfig.setOperatorID(new OperatorID());
233234
RowDataSerializer inputSerializer = new RowDataSerializer(rowTypeWithFileId);

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.sink.utils;
2020

21+
import org.apache.hudi.adapter.CollectOutputAdapter;
2122
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
2223
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
2324
import org.apache.hudi.sink.clustering.ClusteringOperator;
@@ -58,11 +59,11 @@ public class ClusteringFunctionWrapper {
5859
/**
5960
* Output to collect the clustering plan events.
6061
*/
61-
private CollectorOutput<ClusteringPlanEvent> planEventOutput;
62+
private CollectOutputAdapter<ClusteringPlanEvent> planEventOutput;
6263
/**
6364
* Output to collect the clustering commit events.
6465
*/
65-
private CollectorOutput<ClusteringCommitEvent> commitEventOutput;
66+
private CollectOutputAdapter<ClusteringCommitEvent> commitEventOutput;
6667
/**
6768
* Function that executes the clustering task.
6869
*/
@@ -87,14 +88,14 @@ public ClusteringFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask
8788

8889
public void openFunction() throws Exception {
8990
clusteringPlanOperator = new ClusteringPlanOperator(conf);
90-
planEventOutput = new CollectorOutput<>();
91+
planEventOutput = new CollectOutputAdapter<>();
9192
clusteringPlanOperator.setup(streamTask, streamConfig, planEventOutput);
9293
clusteringPlanOperator.open();
9394

9495
clusteringOperator = new ClusteringOperator(conf, TestConfigurations.ROW_TYPE);
9596
// CAUTION: deprecated API used.
9697
clusteringOperator.setProcessingTimeService(new TestProcessingTimeService());
97-
commitEventOutput = new CollectorOutput<>();
98+
commitEventOutput = new CollectOutputAdapter<>();
9899
clusteringOperator.setup(streamTask, streamConfig, commitEventOutput);
99100
clusteringOperator.open();
100101
final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
@@ -108,7 +109,7 @@ public void openFunction() throws Exception {
108109

109110
public void cluster(long checkpointID) throws Exception {
110111
// collect the ClusteringPlanEvents.
111-
CollectorOutput<ClusteringPlanEvent> planOutput = new CollectorOutput<>();
112+
CollectOutputAdapter<ClusteringPlanEvent> planOutput = new CollectOutputAdapter<>();
112113
clusteringPlanOperator.setOutput(planOutput);
113114
clusteringPlanOperator.notifyCheckpointComplete(checkpointID);
114115
// collect the ClusteringCommitEvents

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.sink.utils;
2020

21+
import org.apache.hudi.adapter.CollectOutputAdapter;
2122
import org.apache.hudi.avro.model.HoodieCompactionPlan;
2223
import org.apache.hudi.sink.compact.CompactOperator;
2324
import org.apache.hudi.sink.compact.CompactionCommitEvent;
@@ -58,11 +59,11 @@ public class CompactFunctionWrapper {
5859
/**
5960
* Output to collect the compaction plan events.
6061
*/
61-
private CollectorOutput<CompactionPlanEvent> planEventOutput;
62+
private CollectOutputAdapter<CompactionPlanEvent> planEventOutput;
6263
/**
6364
* Output to collect the compaction commit events.
6465
*/
65-
private CollectorOutput<CompactionCommitEvent> commitEventOutput;
66+
private CollectOutputAdapter<CompactionCommitEvent> commitEventOutput;
6667
/**
6768
* Function that executes the compaction task.
6869
*/
@@ -87,14 +88,14 @@ public CompactFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask, S
8788

8889
public void openFunction() throws Exception {
8990
compactionPlanOperator = new CompactionPlanOperator(conf);
90-
planEventOutput = new CollectorOutput<>();
91+
planEventOutput = new CollectOutputAdapter<>();
9192
compactionPlanOperator.setup(streamTask, streamConfig, planEventOutput);
9293
compactionPlanOperator.open();
9394

9495
compactOperator = new CompactOperator(conf);
9596
// CAUTION: deprecated API used.
9697
compactOperator.setProcessingTimeService(new TestProcessingTimeService());
97-
commitEventOutput = new CollectorOutput<>();
98+
commitEventOutput = new CollectOutputAdapter<>();
9899
compactOperator.setup(streamTask, streamConfig, commitEventOutput);
99100
compactOperator.open();
100101
final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.sink.utils;
2020

21+
import org.apache.hudi.adapter.CollectOutputAdapter;
2122
import org.apache.hudi.common.model.HoodieKey;
2223
import org.apache.hudi.common.model.HoodieRecord;
2324
import org.apache.hudi.configuration.FlinkOptions;
@@ -147,7 +148,7 @@ public void openFunction() throws Exception {
147148

148149
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
149150
bootstrapOperator = new BootstrapOperator<>(conf);
150-
CollectorOutput<HoodieRecord<?>> output = new CollectorOutput<>();
151+
CollectOutputAdapter<HoodieRecord<?>> output = new CollectOutputAdapter<>();
151152
bootstrapOperator.setup(streamTask, streamConfig, output);
152153
bootstrapOperator.initializeState(this.stateInitializationContext);
153154

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CollectorOutput.java hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/CollectOutputAdapter.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
99
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
10+
* http://www.apache.org/licenses/LICENSE-2.0
1111
*
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hudi.sink.utils;
19+
package org.apache.hudi.adapter;
2020

2121
import org.apache.flink.streaming.api.operators.Output;
2222
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -29,13 +29,13 @@
2929
import java.util.List;
3030

3131
/**
32-
* Collecting {@link Output} for {@link StreamRecord}.
32+
* Adapter clazz for {@code Output}.
3333
*/
34-
public class CollectorOutput<T> implements Output<StreamRecord<T>> {
34+
public class CollectOutputAdapter<T> implements Output<StreamRecord<T>> {
3535

3636
private final List<T> records;
3737

38-
public CollectorOutput() {
38+
public CollectOutputAdapter() {
3939
this.records = new ArrayList<>();
4040
}
4141

@@ -72,4 +72,4 @@ public void close() {
7272
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
7373
// no operation
7474
}
75-
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.adapter;
20+
21+
import org.apache.flink.streaming.api.operators.Output;
22+
import org.apache.flink.streaming.api.watermark.Watermark;
23+
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
24+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
25+
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
26+
import org.apache.flink.util.OutputTag;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/**
32+
* Adapter clazz for {@code Output}.
33+
*/
34+
public class CollectOutputAdapter<T> implements Output<StreamRecord<T>> {
35+
36+
private final List<T> records;
37+
38+
public CollectOutputAdapter() {
39+
this.records = new ArrayList<>();
40+
}
41+
42+
public List<T> getRecords() {
43+
return this.records;
44+
}
45+
46+
@Override
47+
public void emitWatermark(Watermark mark) {
48+
// no operation
49+
}
50+
51+
@Override
52+
public void emitLatencyMarker(LatencyMarker latencyMarker) {
53+
// no operation
54+
}
55+
56+
@Override
57+
public void collect(StreamRecord<T> record) {
58+
records.add(record.getValue());
59+
}
60+
61+
@Override
62+
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
63+
throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
64+
}
65+
66+
@Override
67+
public void close() {
68+
this.records.clear();
69+
}
70+
71+
@Override
72+
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
73+
// no operation
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.adapter;
20+
21+
import org.apache.flink.streaming.api.operators.Output;
22+
import org.apache.flink.streaming.api.watermark.Watermark;
23+
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
24+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
25+
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
26+
import org.apache.flink.util.OutputTag;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/**
32+
* Adapter clazz for {@code Output}.
33+
*/
34+
public class CollectOutputAdapter<T> implements Output<StreamRecord<T>> {
35+
36+
private final List<T> records;
37+
38+
public CollectOutputAdapter() {
39+
this.records = new ArrayList<>();
40+
}
41+
42+
public List<T> getRecords() {
43+
return this.records;
44+
}
45+
46+
@Override
47+
public void emitWatermark(Watermark mark) {
48+
// no operation
49+
}
50+
51+
@Override
52+
public void emitLatencyMarker(LatencyMarker latencyMarker) {
53+
// no operation
54+
}
55+
56+
@Override
57+
public void collect(StreamRecord<T> record) {
58+
records.add(record.getValue());
59+
}
60+
61+
@Override
62+
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
63+
throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
64+
}
65+
66+
@Override
67+
public void close() {
68+
this.records.clear();
69+
}
70+
71+
@Override
72+
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
73+
// no operation
74+
}
75+
}

0 commit comments

Comments
 (0)