Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timestamp with timezone mapping in iceberg type converter #23534

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,10 @@ Map of Iceberg types to the relevant PrestoDB types:
- ``TIME``
* - ``TIMESTAMP``
- ``TIMESTAMP``
* - ``TIMESTAMP``
- ``TIMESTAMP_WITH_TIMEZONE``
* - ``STRING``
- ``VARCHAR``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need the mapping for the table below (prestodb -> iceberg)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not sure about this. The below table contains...

  * - ``TIMESTAMP``
    - ``TIMESTAMP WITHOUT ZONE``
  * - ``TIMESTAMP WITH TIMEZONE``
    - ``TIMESTAMP WITH ZONE``

I suppose with and without zone just represent isAdjustedUTC in this case? Either way it looks like this part is covered. Maybe we should standardize the way we discuss these in the documentation though

* - ``UUID``
- ``UUID``
* - ``LIST``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTimeZone;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -899,8 +900,12 @@ public void testDecimalBackedByINT32()
public void testTimestampMicrosBackedByINT64()
throws Exception
{
org.apache.parquet.schema.MessageType parquetSchema =
MessageTypeParser.parseMessageType("message ts_micros { optional INT64 test (TIMESTAMP_MICROS); }");
LogicalTypeAnnotation annotation = LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS);
MessageType parquetSchema = Types.buildMessage()
.primitive(PrimitiveType.PrimitiveTypeName.INT64, OPTIONAL)
.as(annotation)
.named("test")
.named("ts_micros");
ContiguousSet<Long> longValues = longsBetween(1_000_000, 1_001_000);
ImmutableList.Builder<SqlTimestamp> expectedValues = new ImmutableList.Builder<>();
for (Long value : longValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.UuidType;
import com.facebook.presto.common.type.VarbinaryType;
Expand All @@ -47,6 +48,7 @@
import static com.facebook.presto.common.predicate.Marker.Bound.ABOVE;
import static com.facebook.presto.common.predicate.Marker.Bound.BELOW;
import static com.facebook.presto.common.predicate.Marker.Bound.EXACTLY;
import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc;
import static com.facebook.presto.iceberg.IcebergColumnHandle.getPushedDownSubfield;
import static com.facebook.presto.iceberg.IcebergColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.parquet.ParquetTypeUtils.columnPathFromSubfield;
Expand Down Expand Up @@ -203,6 +205,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
return MILLISECONDS.toMicros((Long) marker.getValue());
}

if (type instanceof TimestampWithTimeZoneType) {
return MILLISECONDS.toMicros(unpackMillisUtc((Long) marker.getValue()));
}

if (type instanceof VarcharType) {
return ((Slice) marker.getValue()).toStringUtf8();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetSchema;
import static com.facebook.presto.iceberg.IcebergUtil.validateTableMode;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName;
import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm;
import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields;
Expand Down Expand Up @@ -692,10 +691,6 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle

Type columnType = toIcebergType(column.getType());

if (columnType.equals(Types.TimestampType.withZone())) {
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", columnType));
}

IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns added");
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Expand Down Expand Up @@ -753,8 +748,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
validateTableMode(session, icebergTable);

verifyTypeSupported(icebergTable.schema());

return beginIcebergTableInsert(session, table, icebergTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
Expand Down Expand Up @@ -277,8 +276,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

Schema schema = toIcebergSchema(tableMetadata.getColumns());

verifyTypeSupported(schema);
ZacBlanco marked this conversation as resolved.
Show resolved Hide resolved

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));

MetastoreContext metastoreContext = getMetastoreContext(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
Expand Down Expand Up @@ -309,8 +308,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

Schema schema = toIcebergSchema(tableMetadata.getColumns());

verifyTypeSupported(schema);
ZacBlanco marked this conversation as resolved.
Show resolved Hide resolved

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,6 @@ public static void validateTableMode(ConnectorSession session, org.apache.iceber
}
}

public static void verifyTypeSupported(Schema schema)
{
if (schema.columns().stream().anyMatch(column -> Types.TimestampType.withZone().equals(column.type()))) {
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", Types.TimestampType.withZone()));
}
}

public static Map<String, String> createIcebergViewProperties(ConnectorSession session, String prestoVersion)
{
return ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.hive.HiveType.HIVE_BINARY;
Expand Down Expand Up @@ -118,6 +119,10 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager
case TIME:
return TimeType.TIME;
case TIMESTAMP:
Types.TimestampType timestampType = (Types.TimestampType) type.asPrimitiveType();
if (timestampType.shouldAdjustToUTC()) {
return TIMESTAMP_WITH_TIME_ZONE;
}
return TimestampType.TIMESTAMP;
case STRING:
return VarcharType.createUnboundedVarcharType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ public void testTimestamp()
@Test
public void testTimestampWithTimeZone()
{
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x timestamp with time zone)", "Iceberg column type timestamptz is not supported");
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Iceberg column type timestamptz is not supported");
assertUpdate("CREATE TABLE test_timestamp_with_timezone (x timestamp)");
assertQueryFails("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone", "Iceberg column type timestamptz is not supported");
assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x timestamp with time zone)");
dropTable(getSession(), "test_timestamp_with_timezone");
assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'");
assertQuerySucceeds("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone");
dropTable(getSession(), "test_timestamp_with_timezone");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static DistributedQueryRunner createIcebergQueryRunner(
Optional<Path> dataDirectory)
throws Exception
{
return createIcebergQueryRunner(extraProperties, extraConnectorProperties, format, createTpchTables, addJmxPlugin, nodeCount, externalWorkerLauncher, dataDirectory, false, Optional.empty());
return createIcebergQueryRunner(extraProperties, extraConnectorProperties, format, createTpchTables, addJmxPlugin, nodeCount, externalWorkerLauncher, dataDirectory, false);
}

public static DistributedQueryRunner createIcebergQueryRunner(
Expand All @@ -161,12 +161,13 @@ public static DistributedQueryRunner createIcebergQueryRunner(
boolean addStorageFormatToPath)
throws Exception
{
return createIcebergQueryRunner(extraProperties, extraConnectorProperties, format, createTpchTables, addJmxPlugin, nodeCount, externalWorkerLauncher, dataDirectory, addStorageFormatToPath, Optional.empty());
return createIcebergQueryRunner(extraProperties, extraConnectorProperties, ImmutableMap.of(), format, createTpchTables, addJmxPlugin, nodeCount, externalWorkerLauncher, dataDirectory, addStorageFormatToPath, Optional.empty());
}

public static DistributedQueryRunner createIcebergQueryRunner(
Map<String, String> extraProperties,
Map<String, String> extraConnectorProperties,
Map<String, String> extraSessionProperties,
FileFormat format,
boolean createTpchTables,
boolean addJmxPlugin,
Expand All @@ -179,10 +180,14 @@ public static DistributedQueryRunner createIcebergQueryRunner(
{
setupLogging();

Session session = testSessionBuilder()
Session.SessionBuilder sessionBuilder = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.setSchema(schemaName.orElse("tpch"))
.build();
.setSchema(schemaName.orElse("tpch"));

for (Map.Entry<String, String> property : extraSessionProperties.entrySet()) {
sessionBuilder.setCatalogSessionProperty(ICEBERG_CATALOG, property.getKey(), property.getValue());
}
Session session = sessionBuilder.build();

DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
.setExtraProperties(extraProperties)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;

import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED;
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static com.google.common.base.Preconditions.checkState;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestIcebergTypes
extends AbstractTestQueryFramework
{
private QueryRunner batchReaderEnabledQueryRunner;

@Override
protected QueryRunner createQueryRunner() throws Exception
{
this.batchReaderEnabledQueryRunner = createIcebergQueryRunner(
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no need to refactor the IcebergQueryRunner.createIcebergQueryRunner(...), just set hive.parquet-batch-read-optimization-enabled to true in extraProperties would be ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is another way, but adding this to extra properties actually causes the tests to break since it is an unused property there. This sets it as a configuration property, but it needs to be a session property. The session is actually passed to the distributed query runner builder in it's constructor, so we need some way to add properties to that session before building the runner. This is why I decided to make changes to IcebergQueryRunner. Please let me know if this clears things up, if you have another approach in mind here I would certainly be open to it!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, sorry for the overlooking that hive.parquet-batch-read-optimization-enabled belongs to the config in presto-hive-common. Since presto-main does not dependent on presto-hive-common, TestingPrestoServer does not inject HiveCommonModule either. So you are right currently we cannot set hive.parquet-batch-read-optimization-enabled in extraProperties here.

new IcebergConfig().getFileFormat(),
true,
false,
OptionalInt.empty(),
Optional.empty(),
Optional.empty(),
false,
Optional.empty());
return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of());
}

@DataProvider(name = "testTimestampWithTimezone")
public Object[][] createTestTimestampWithTimezoneData()
{
return new Object[][] {
{getQueryRunner()},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we need a whole separate test for this. Can't we just create a dataProvider which passes in true/false values and lets us construct a valid session in the beginning of the test method? Then you can pass the session to all of the execute/assertQuery methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this please? I'm not sure what you are referring to as the separate test. I can have the data provider pass in one true and one false value and add a condition inside the test function itself, is that what you are asking for here? If so what purpose would that serve? Thanks

{getBatchReaderEnabledQueryRunner()}
};
}

@Test(dataProvider = "testTimestampWithTimezone")
public void testTimestampWithTimezone(QueryRunner runner)
{
String timestamptz = "TIMESTAMP '1984-12-08 00:10:00 America/Los_Angeles'";
String timestamp = "TIMESTAMP '1984-12-08 00:10:00'";

runner.execute("CREATE TABLE test_timestamptz(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE)");
String row = "(" + timestamptz + ", " + timestamp + ", " + timestamptz + ")";
for (int i = 0; i < 10; i++) {
runner.execute("INSERT INTO test_timestamptz values " + row);
}

MaterializedResult initialRows = runner.execute("SELECT * FROM test_timestamptz");
List<Type> types = initialRows.getTypes();

assertTrue(types.get(0) instanceof TimestampWithTimeZoneType);
assertTrue(types.get(1) instanceof TimestampType);

runner.execute("CREATE TABLE test_timestamptz_partition(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE) " +
"WITH (PARTITIONING = ARRAY['b'])");
runner.execute("INSERT INTO test_timestamptz_partition (a, b, c) SELECT a, b, c FROM test_timestamptz");

MaterializedResult partitionRows = runner.execute("SELECT * FROM test_timestamptz");
List<Type> partitionTypes = partitionRows.getTypes();

assertTrue(partitionTypes.get(0) instanceof TimestampWithTimeZoneType);
assertTrue(partitionTypes.get(1) instanceof TimestampType);

String earlyTimestamptz = "TIMESTAMP '1980-12-08 00:10:00 America/Los_Angeles'";
runner.execute("CREATE TABLE test_timestamptz_filter(a TIMESTAMP WITH TIME ZONE)");

for (int i = 0; i < 5; i++) {
runner.execute("INSERT INTO test_timestamptz_filter VALUES (" + earlyTimestamptz + ")");
}
for (int i = 0; i < 5; i++) {
runner.execute("INSERT INTO test_timestamptz_filter VALUES (" + timestamptz + ")");
}

MaterializedResult lateRows = runner.execute("SELECT a FROM test_timestamptz_filter WHERE a > " + earlyTimestamptz);
assertEquals(lateRows.getMaterializedRows().size(), 5);

MaterializedResult lateRowsFromEquals = runner.execute("SELECT a FROM test_timestamptz_filter WHERE a = " + timestamptz);
com.facebook.presto.testing.assertions.Assert.assertEquals(lateRows, lateRowsFromEquals);

MaterializedResult earlyRows = runner.execute("SELECT a FROM test_timestamptz_filter WHERE a < " + timestamptz);
assertEquals(earlyRows.getMaterializedRows().size(), 5);

MaterializedResult earlyRowsFromEquals = runner.execute("SELECT a FROM test_timestamptz_filter WHERE a = " + earlyTimestamptz);
com.facebook.presto.testing.assertions.Assert.assertEquals(earlyRows, earlyRowsFromEquals);
}

private QueryRunner getBatchReaderEnabledQueryRunner()
{
checkState(batchReaderEnabledQueryRunner != null, "batchReaderEnabledQueryRunner not set");
return batchReaderEnabledQueryRunner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.iceberg.NestedFieldConverter.toIcebergNestedField;
Expand Down Expand Up @@ -176,6 +178,12 @@ protected static PrestoIcebergNestedField prestoIcebergNestedField(
case "date":
prestoType = DATE;
break;
case "timestamp":
prestoType = TIMESTAMP;
break;
case "timestamptz":
prestoType = TIMESTAMP_WITH_TIME_ZONE;
break;
case "nested":
prestoType = RowType.from(ImmutableList.of(
RowType.field("int", INTEGER),
Expand Down Expand Up @@ -239,6 +247,12 @@ protected static Types.NestedField nestedField(int id, String name)
case "date":
icebergType = Types.DateType.get();
break;
case "timestamp":
icebergType = Types.TimestampType.withoutZone();
break;
case "timestamptz":
icebergType = Types.TimestampType.withZone();
break;
case "nested":
icebergType = nested();
break;
Expand Down
Loading
Loading