Skip to content

Commit 5d5866b

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-31672][SQL] Fix loading of timestamps before 1582-10-15 from dictionary encoded Parquet columns
### What changes were proposed in this pull request? Modified the `decodeDictionaryIds()` method of `VectorizedColumnReader` to handle especially `TimestampType` when the passed parameter `rebaseDateTime` is true. In that case, decoded milliseconds/microseconds are rebased from the hybrid calendar to Proleptic Gregorian calendar using `RebaseDateTime`.`rebaseJulianToGregorianMicros()`. ### Why are the changes needed? This fixes the bug of loading timestamps before the cutover day from dictionary encoded column in parquet files. The code below forces dictionary encoding: ```scala spark.conf.set("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled", true) scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") scala> Seq.tabulate(8)(_ => "1001-01-01 01:02:03.123").toDF("tsS") .select($"tsS".cast("timestamp").as("ts")).repartition(1) .write .option("parquet.enable.dictionary", true) .parquet(path) ``` Load the dates back: ```scala scala> spark.read.parquet(path).show(false) +-----------------------+ |ts | +-----------------------+ |1001-01-07 00:32:20.123| ... |1001-01-07 00:32:20.123| +-----------------------+ ``` Expected values **must be 1001-01-01 01:02:03.123** but not 1001-01-07 00:32:20.123. ### Does this PR introduce _any_ user-facing change? Yes. After the changes: ```scala scala> spark.read.parquet(path).show(false) +-----------------------+ |ts | +-----------------------+ |1001-01-01 01:02:03.123| ... |1001-01-01 01:02:03.123| +-----------------------+ ``` ### How was this patch tested? Modified the test `SPARK-31159: rebasing timestamps in write` in `ParquetIOSuite` to checked reading dictionary encoded dates. Closes apache#28489 from MaxGekk/fix-ts-rebase-parquet-dict-enc. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 9f768fa commit 5d5866b

File tree

2 files changed

+64
-32
lines changed

2 files changed

+64
-32
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

+27-4
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,11 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName
159159
isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
160160
break;
161161
case INT64:
162-
isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
162+
if (originalType == OriginalType.TIMESTAMP_MICROS) {
163+
isSupported = !rebaseDateTime;
164+
} else {
165+
isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
166+
}
163167
break;
164168
case FLOAT:
165169
case DOUBLE:
@@ -313,17 +317,36 @@ private void decodeDictionaryIds(
313317
case INT64:
314318
if (column.dataType() == DataTypes.LongType ||
315319
DecimalType.is64BitDecimalType(column.dataType()) ||
316-
originalType == OriginalType.TIMESTAMP_MICROS) {
320+
(originalType == OriginalType.TIMESTAMP_MICROS && !rebaseDateTime)) {
317321
for (int i = rowId; i < rowId + num; ++i) {
318322
if (!column.isNullAt(i)) {
319323
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
320324
}
321325
}
322326
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
327+
if (rebaseDateTime) {
328+
for (int i = rowId; i < rowId + num; ++i) {
329+
if (!column.isNullAt(i)) {
330+
long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
331+
long julianMicros = DateTimeUtils.millisToMicros(julianMillis);
332+
long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
333+
column.putLong(i, gregorianMicros);
334+
}
335+
}
336+
} else {
337+
for (int i = rowId; i < rowId + num; ++i) {
338+
if (!column.isNullAt(i)) {
339+
long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
340+
column.putLong(i, DateTimeUtils.millisToMicros(gregorianMillis));
341+
}
342+
}
343+
}
344+
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
323345
for (int i = rowId; i < rowId + num; ++i) {
324346
if (!column.isNullAt(i)) {
325-
column.putLong(i,
326-
DateTimeUtils.millisToMicros(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
347+
long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
348+
long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
349+
column.putLong(i, gregorianMicros);
327350
}
328351
}
329352
} else {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

+37-28
Original file line numberDiff line numberDiff line change
@@ -937,37 +937,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
937937
}
938938

939939
test("SPARK-31159: rebasing timestamps in write") {
940-
Seq(
941-
("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
942-
("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
943-
("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
944-
).foreach { case (outType, tsStr, nonRebased) =>
945-
withClue(s"output type $outType") {
946-
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
947-
withTempPath { dir =>
948-
val path = dir.getAbsolutePath
949-
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
950-
Seq(tsStr).toDF("tsS")
951-
.select($"tsS".cast("timestamp").as("ts"))
952-
.write
953-
.parquet(path)
954-
}
940+
val N = 8
941+
Seq(false, true).foreach { dictionaryEncoding =>
942+
Seq(
943+
("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"),
944+
("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"),
945+
("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456")
946+
).foreach { case (outType, tsStr, nonRebased) =>
947+
withClue(s"output type $outType") {
948+
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) {
949+
withTempPath { dir =>
950+
val path = dir.getAbsolutePath
951+
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
952+
Seq.tabulate(N)(_ => tsStr).toDF("tsS")
953+
.select($"tsS".cast("timestamp").as("ts"))
954+
.repartition(1)
955+
.write
956+
.option("parquet.enable.dictionary", dictionaryEncoding)
957+
.parquet(path)
958+
}
955959

956-
Seq(false, true).foreach { vectorized =>
957-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
958-
// The file metadata indicates if it needs rebase or not, so we can always get the
959-
// correct result regardless of the "rebaseInRead" config.
960-
Seq(true, false).foreach { rebase =>
961-
withSQLConf(
962-
SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
963-
checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr)))
960+
Seq(false, true).foreach { vectorized =>
961+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
962+
// The file metadata indicates if it needs rebase or not, so we can always get the
963+
// correct result regardless of the "rebaseInRead" config.
964+
Seq(true, false).foreach { rebase =>
965+
withSQLConf(
966+
SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
967+
checkAnswer(
968+
spark.read.parquet(path),
969+
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr))))
970+
}
964971
}
965-
}
966972

967-
// Force to not rebase to prove the written datetime values are rebased
968-
// and we will get wrong result if we don't rebase while reading.
969-
withSQLConf("spark.test.forceNoRebase" -> "true") {
970-
checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased)))
973+
// Force to not rebase to prove the written datetime values are rebased
974+
// and we will get wrong result if we don't rebase while reading.
975+
withSQLConf("spark.test.forceNoRebase" -> "true") {
976+
checkAnswer(
977+
spark.read.parquet(path),
978+
Seq.tabulate(N)(_ => Row(Timestamp.valueOf(nonRebased))))
979+
}
971980
}
972981
}
973982
}

0 commit comments

Comments
 (0)