Skip to content

Commit 46d2a30

Browse files
authored
[HUDI-7958] Create partition stats index for all columns when no cols specified (apache#11579)
1 parent d7eb992 commit 46d2a30

File tree

8 files changed

+163
-71
lines changed

8 files changed

+163
-71
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

-4
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,6 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
406406
fileGroupCountAndRecordsPair = initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
407407
break;
408408
case PARTITION_STATS:
409-
if (dataWriteConfig.getColumnsEnabledForColumnStatsIndex().isEmpty()) {
410-
LOG.warn("Skipping partition stats index initialization as target columns are not set");
411-
continue;
412-
}
413409
fileGroupCountAndRecordsPair = initializePartitionStatsIndex(partitionInfoList);
414410
break;
415411
case SECONDARY_INDEX:

hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
332332

333333
public static final ConfigProperty<Boolean> ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty
334334
.key(METADATA_PREFIX + ".index.partition.stats.enable")
335-
.defaultValue(true)
335+
.defaultValue(false)
336336
.sinceVersion("1.0.0")
337337
.withDocumentation("Enable aggregating stats for each column at the storage partition level.");
338338

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ protected HoodieMetadataPayload(String key, HoodieMetadataBloomFilter metadataBl
212212
this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, metadataBloomFilter, null, null, null);
213213
}
214214

215-
protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats columnStats) {
216-
this(key, MetadataPartitionType.COLUMN_STATS.getRecordType(), null, null, columnStats, null, null);
215+
protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats columnStats, int recordType) {
216+
this(key, recordType, null, null, columnStats, null, null);
217217
}
218218

219219
private HoodieMetadataPayload(String key, HoodieRecordIndexInfo recordIndexMetadata) {
@@ -482,7 +482,8 @@ public static Stream<HoodieRecord> createColumnStatsRecords(String partitionName
482482
HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName, columnRangeMetadata),
483483
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
484484

485-
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(),
485+
HoodieMetadataPayload payload = new HoodieMetadataPayload(
486+
key.getRecordKey(),
486487
HoodieMetadataColumnStats.newBuilder()
487488
.setFileName(new StoragePath(columnRangeMetadata.getFilePath()).getName())
488489
.setColumnName(columnRangeMetadata.getColumnName())
@@ -493,7 +494,8 @@ public static Stream<HoodieRecord> createColumnStatsRecords(String partitionName
493494
.setTotalSize(columnRangeMetadata.getTotalSize())
494495
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
495496
.setIsDeleted(isDeleted)
496-
.build());
497+
.build(),
498+
MetadataPartitionType.COLUMN_STATS.getRecordType());
497499

498500
return new HoodieAvroRecord<>(key, payload);
499501
});
@@ -505,7 +507,8 @@ public static Stream<HoodieRecord> createPartitionStatsRecords(String partitionP
505507
return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
506508
HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath, columnRangeMetadata.getColumnName()),
507509
MetadataPartitionType.PARTITION_STATS.getPartitionPath());
508-
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(),
510+
HoodieMetadataPayload payload = new HoodieMetadataPayload(
511+
key.getRecordKey(),
509512
HoodieMetadataColumnStats.newBuilder()
510513
.setFileName(columnRangeMetadata.getFilePath())
511514
.setColumnName(columnRangeMetadata.getColumnName())
@@ -516,7 +519,8 @@ public static Stream<HoodieRecord> createPartitionStatsRecords(String partitionP
516519
.setTotalSize(columnRangeMetadata.getTotalSize())
517520
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
518521
.setIsDeleted(isDeleted)
519-
.build());
522+
.build(),
523+
MetadataPartitionType.PARTITION_STATS.getRecordType());
520524

521525
return new HoodieAvroRecord<>(key, payload);
522526
});

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,7 @@ public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(Hoodi
716716

717717
if (columnsToIndex.isEmpty()) {
718718
// In case there are no columns to index, bail
719+
LOG.warn("No columns to index for column stats index.");
719720
return engineContext.emptyHoodieData();
720721
}
721722

@@ -932,6 +933,7 @@ public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEn
932933
Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
933934
if (columnsToIndex.isEmpty()) {
934935
// In case there are no columns to index, bail
936+
LOG.warn("No columns to index for column stats index.");
935937
return engineContext.emptyHoodieData();
936938
}
937939

@@ -2046,8 +2048,12 @@ public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(Hoodi
20462048
List<DirectoryInfo> partitionInfoList,
20472049
HoodieMetadataConfig metadataConfig,
20482050
HoodieTableMetaClient dataTableMetaClient) {
2049-
final List<String> columnsToIndex = metadataConfig.getColumnsEnabledForColumnStatsIndex();
2051+
final List<String> columnsToIndex = getColumnsToIndex(
2052+
metadataConfig.isPartitionStatsIndexEnabled(),
2053+
metadataConfig.getColumnsEnabledForColumnStatsIndex(),
2054+
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
20502055
if (columnsToIndex.isEmpty()) {
2056+
LOG.warn("No columns to index for partition stats index");
20512057
return engineContext.emptyHoodieData();
20522058
}
20532059
LOG.debug("Indexing following columns for partition stats index: {}", columnsToIndex);

hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,12 @@
3939

4040
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
4141
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
42-
import static org.apache.hudi.common.config.HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS;
4342
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
4443
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
4544
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS;
4645
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS;
4746
import static org.apache.hudi.common.config.HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP;
4847
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
49-
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
50-
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
51-
import static org.apache.hudi.common.util.StringUtils.nonEmpty;
5248
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
5349
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
5450
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -123,7 +119,7 @@ public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload older
123119
HoodieMetadataColumnStats previousColStatsRecord = older.getColumnStatMetadata().get();
124120
HoodieMetadataColumnStats newColumnStatsRecord = newer.getColumnStatMetadata().get();
125121

126-
return new HoodieMetadataPayload(newer.key, mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord));
122+
return new HoodieMetadataPayload(newer.key, mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord), getRecordType());
127123
}
128124
},
129125
BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS, "bloom-filters-", 4) {
@@ -235,13 +231,24 @@ public String getPartitionPath(HoodieTableMetaClient metaClient, String indexNam
235231
PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, "partition-stats-", 6) {
236232
@Override
237233
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
238-
return getBooleanWithAltKeys(writeConfig, ENABLE_METADATA_INDEX_PARTITION_STATS) && nonEmpty(getStringWithAltKeys(writeConfig, COLUMN_STATS_INDEX_FOR_COLUMNS, EMPTY_STRING));
234+
return getBooleanWithAltKeys(writeConfig, ENABLE_METADATA_INDEX_PARTITION_STATS);
239235
}
240236

241237
@Override
242238
public void constructMetadataPayload(HoodieMetadataPayload payload, GenericRecord record) {
243239
constructColumnStatsMetadataPayload(payload, record);
244240
}
241+
242+
@Override
243+
public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload older, HoodieMetadataPayload newer) {
244+
checkArgument(older.getColumnStatMetadata().isPresent());
245+
checkArgument(newer.getColumnStatMetadata().isPresent());
246+
247+
HoodieMetadataColumnStats previousColStatsRecord = older.getColumnStatMetadata().get();
248+
HoodieMetadataColumnStats newColumnStatsRecord = newer.getColumnStatMetadata().get();
249+
250+
return new HoodieMetadataPayload(newer.key, mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord), getRecordType());
251+
}
245252
},
246253
// ALL_PARTITIONS is just another record type in FILES partition
247254
ALL_PARTITIONS(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 1) {
@@ -411,6 +418,9 @@ public static MetadataPartitionType[] getValidValues() {
411418
* Returns the list of metadata partition types enabled based on the metadata config and table config.
412419
*/
413420
public static List<MetadataPartitionType> getEnabledPartitions(TypedProperties writeConfig, HoodieTableMetaClient metaClient) {
421+
if (!getBooleanWithAltKeys(writeConfig, ENABLE)) {
422+
return Collections.emptyList();
423+
}
414424
return Arrays.stream(getValidValues())
415425
.filter(partitionType -> partitionType.isMetadataPartitionEnabled(writeConfig) || partitionType.isMetadataPartitionAvailable(metaClient))
416426
.collect(Collectors.toList());

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java

+29-14
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,9 @@ void testSyncMetadataTable() throws Exception {
315315
reset();
316316
// override the default configuration
317317
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
318+
int metadataCompactionDeltaCommits = 5;
318319
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
319-
conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
320+
conf.setInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, metadataCompactionDeltaCommits);
320321
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
321322
coordinator = new StreamWriteOperatorCoordinator(conf, context);
322323
coordinator.start();
@@ -332,23 +333,27 @@ void testSyncMetadataTable() throws Exception {
332333
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
333334
HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), metadataTableBasePath);
334335
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
335-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
336+
HoodieTableMetaClient dataTableMetaClient =
337+
HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new Path(metadataTableBasePath).getParent().getParent().toString());
338+
int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
339+
assertThat("Instants needed to sync to metadata table do not match", completedTimeline.countInstants(), is(metadataPartitions));
336340
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
337341

338342
// test metadata table compaction
339-
// write another 4 commits
340-
for (int i = 1; i < 5; i++) {
343+
// write few more commits until compaction
344+
int numCommits;
345+
for (numCommits = metadataPartitions; numCommits < metadataCompactionDeltaCommits; numCommits++) {
341346
instant = mockWriteWithMetadata();
342347
metadataTableMetaClient.reloadActiveTimeline();
343348
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
344-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1));
349+
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(numCommits + 1));
345350
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
346351
}
347352
// the 5th commit triggers the compaction
348353
mockWriteWithMetadata();
349354
metadataTableMetaClient.reloadActiveTimeline();
350355
completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
351-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(7));
356+
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(numCommits + 2));
352357
assertThat(completedTimeline.nthFromLastInstant(0).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
353358
// write another 2 commits
354359
for (int i = 7; i < 8; i++) {
@@ -401,24 +406,28 @@ void testSyncMetadataTableWithLogCompaction() throws Exception {
401406
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
402407
HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), metadataTableBasePath);
403408
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
404-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
409+
HoodieTableMetaClient dataTableMetaClient =
410+
HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new Path(metadataTableBasePath).getParent().getParent().toString());
411+
int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
412+
assertThat("Instants needed to sync to metadata table do not match", completedTimeline.countInstants(), is(metadataPartitions));
405413
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
406414

407415
// test metadata table log compaction
408416
// already 1 commit is used to initialized FILES partition in MDT
409417
// write another 4 commits
410-
for (int i = 1; i < 5; i++) {
418+
int numCommits;
419+
for (numCommits = metadataPartitions; numCommits < metadataPartitions + 4; numCommits++) {
411420
instant = mockWriteWithMetadata();
412421
metadataTableMetaClient.reloadActiveTimeline();
413422
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
414-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(i + 1));
423+
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(numCommits + 1));
415424
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
416425
}
417426
// the 5th commit triggers the log compaction
418427
mockWriteWithMetadata();
419428
metadataTableMetaClient.reloadActiveTimeline();
420429
completedTimeline = metadataTableMetaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
421-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(7));
430+
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(numCommits + 2));
422431
assertThat("The log compaction instant time should be new generated",
423432
completedTimeline.nthFromLastInstant(1).get().getTimestamp(), not(instant));
424433
// log compaction is another delta commit
@@ -447,7 +456,10 @@ void testSyncMetadataTableWithRollback() throws Exception {
447456
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
448457
HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), metadataTableBasePath);
449458
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
450-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
459+
HoodieTableMetaClient dataTableMetaClient =
460+
HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new Path(metadataTableBasePath).getParent().getParent().toString());
461+
int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
462+
assertThat("Instants needed to sync to metadata table do not match", completedTimeline.countInstants(), is(metadataPartitions));
451463
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
452464

453465
// writes a normal commit
@@ -464,7 +476,7 @@ void testSyncMetadataTableWithRollback() throws Exception {
464476
metadataTableMetaClient.reloadActiveTimeline();
465477

466478
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
467-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(4));
479+
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(metadataPartitions + 3));
468480
assertThat(completedTimeline.nthFromLastInstant(1).get().getTimestamp(), is(instant));
469481
assertThat("The pending instant should be rolled back first",
470482
completedTimeline.lastInstant().get().getAction(), is(HoodieTimeline.ROLLBACK_ACTION));
@@ -530,13 +542,16 @@ void testLockForMetadataTable() throws Exception {
530542
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
531543
HoodieTableMetaClient metadataTableMetaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), metadataTableBasePath);
532544
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
533-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(1));
545+
HoodieTableMetaClient dataTableMetaClient =
546+
HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(conf)), new Path(metadataTableBasePath).getParent().getParent().toString());
547+
int metadataPartitions = dataTableMetaClient.getTableConfig().getMetadataPartitions().size();
548+
assertThat("Instants needed to sync to metadata table do not match", completedTimeline.countInstants(), is(metadataPartitions));
534549
assertThat(completedTimeline.lastInstant().get().getTimestamp(), startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
535550

536551
instant = mockWriteWithMetadata();
537552
metadataTableMetaClient.reloadActiveTimeline();
538553
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
539-
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(2));
554+
assertThat("One instant need to sync to metadata table", completedTimeline.countInstants(), is(metadataPartitions + 1));
540555
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
541556
}
542557

0 commit comments

Comments
 (0)