Skip to content

Commit

Permalink
[HUDI-8315] update SqlKeyGenerator#convertPartitionPathToSqlType to h…
Browse files Browse the repository at this point in the history
…andle default_partition_path (#12621)
  • Loading branch information
karthick-de-25 authored Jan 19, 2025
1 parent 0178b12 commit d812eec
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.joda.time.format.DateTimeFormat
import org.apache.hudi.common.util.PartitionPathEncodeUtils

import java.sql.Timestamp
import java.util
Expand Down Expand Up @@ -152,28 +153,33 @@ class SqlKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props)
// in this case.
if (partitionFragments.size != partitionSchema.get.size) {
partitionPath
} else {
}
else {
partitionFragments.zip(partitionSchema.get.fields).map {
case (partitionValue, partitionField) =>
val hiveStylePrefix = s"${partitionField.name}="
val isHiveStyle = partitionValue.startsWith(hiveStylePrefix)
val _partitionValue = if (isHiveStyle) partitionValue.substring(hiveStylePrefix.length) else partitionValue

partitionField.dataType match {
case TimestampType =>
val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
} else {
if (isConsistentLogicalTimestampEnabled) {
Timestamp.valueOf(_partitionValue).getTime
if (_partitionValue == PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH) {
partitionValue
}
else {
partitionField.dataType match {
case TimestampType =>
val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis
SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue)
} else {
MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
if (isConsistentLogicalTimestampEnabled) {
Timestamp.valueOf(_partitionValue).getTime
} else {
MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS)
}
}
}
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat
case _ => partitionValue
val timestampFormat = PartitionPathEncodeUtils.escapePathName(
SqlKeyGenerator.timestampTimeFormat.print(timeMs))
if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat
case _ => partitionValue
}
}
}.mkString(KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,41 @@ class TestSparkSqlWithTimestampKeyGenerator extends HoodieSparkSqlTestBase {
}
}
}
test("Test Spark SQL with default partition path for timestamp key generator") {
withTempDir { tmp =>
val keyGeneratorSettings = timestampKeyGeneratorSettings(3)
val tsType = if (keyGeneratorSettings.contains("DATE_STRING")) "string" else "long"
spark.sql(
s"""
| CREATE TABLE test_default_path_ts (
| id int,
| name string,
| precomb long,
| ts TIMESTAMP
| ) USING HUDI
| LOCATION '${tmp.getCanonicalPath + "/test_default_path_ts"}'
| PARTITIONED BY (ts)
| TBLPROPERTIES (
| type = 'COPY_ON_WRITE',
| primaryKey = 'id',
| preCombineField = 'precomb'
| )
|""".stripMargin)
val dataBatches = Array(
"(1, 'a1', 1,TIMESTAMP '2025-01-15 01:02:03')",
"(2, 'a3', 1, null)"
)
val expectedQueryResult: String = "[1,a1,1,2025-01-15 01:02:03.0]; [2,a3,1,null]"
spark.sql(s"INSERT INTO test_default_path_ts VALUES ${dataBatches(0)}")
// inserting value with partition_timestamp value as null
spark.sql(s"INSERT INTO test_default_path_ts VALUES ${dataBatches(1)}")

val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM test_default_path_ts ORDER BY id").collect().mkString("; ")
LOG.warn(s"Query result: $queryResult")
assertResult(expectedQueryResult)(queryResult)

}
}

test("Test mandatory partitioning for timestamp key generator") {
withTempDir { tmp =>
Expand Down

0 comments on commit d812eec

Please sign in to comment.