Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[iceberg] Reduce redundant getTable calls in IcebergHiveMetadata #24376

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@
import com.facebook.presto.spi.statistics.TableStatisticType;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.MetricsConfig;
Expand All @@ -85,6 +88,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;

import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics;
Expand Down Expand Up @@ -132,6 +136,7 @@
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -148,10 +153,13 @@
public class IcebergHiveMetadata
extends IcebergAbstractMetadata
{
public static final int MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE = 1000;

private final ExtendedHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));
private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig;
private final Cache<SchemaTableName, Optional<Table>> tableCache;

public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
Expand All @@ -169,6 +177,7 @@ public IcebergHiveMetadata(
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
this.tableCache = CacheBuilder.newBuilder().maximumSize(MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE).build();
}

public ExtendedHiveMetastore getMetastore()
Expand All @@ -191,8 +200,7 @@ protected View getIcebergView(ConnectorSession session, SchemaTableName schemaTa
@Override
protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName)
{
IcebergTableName name = IcebergTableName.from(schemaTableName.getTableName());
Optional<Table> hiveTable = metastore.getTable(getMetastoreContext(session), schemaTableName.getSchemaName(), name.getTableName());
Optional<Table> hiveTable = getHiveTable(session, schemaTableName);
if (!hiveTable.isPresent()) {
return false;
}
Expand All @@ -202,6 +210,22 @@ protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTa
return true;
}

private Optional<Table> getHiveTable(ConnectorSession session, SchemaTableName schemaTableName)
{
IcebergTableName name = IcebergTableName.from(schemaTableName.getTableName());
try {
return tableCache.get(schemaTableName, () ->
metastore.getTable(getMetastoreContext(session), schemaTableName.getSchemaName(), name.getTableName()));
}
catch (UncheckedExecutionException e) {
throwIfInstanceOf(e.getCause(), PrestoException.class);
throw e;
}
catch (ExecutionException e) {
throw new RuntimeException("Unexpected checked exception by cache load from metastore", e);
}
Comment on lines +222 to +226
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little question, is there a reason not throw a PrestoException here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry missed this comment.

Ideally, an ExecutionException should never be thrown in this case since metastore.getTable does not throw any checked exceptions. Any exceptions from the getTable call would be wrapped in an UncheckedExecutionException, from which we will throw a PrestoException. Because of this, I opted to keep it generic here. However, if you feel it’s better to throw a PrestoException here in any case, I can make that change.

}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
Expand Down Expand Up @@ -369,7 +393,7 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta
encodeViewData(viewData));
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser());

Optional<Table> existing = metastore.getTable(metastoreContext, viewName.getSchemaName(), viewName.getTableName());
Optional<Table> existing = getHiveTable(session, viewName);
if (existing.isPresent()) {
if (!replace || !isPrestoView(existing.get())) {
throw new ViewAlreadyExistsException(viewName);
Expand Down Expand Up @@ -413,7 +437,7 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
}
MetastoreContext metastoreContext = getMetastoreContext(session);
for (SchemaTableName schemaTableName : tableNames) {
Optional<Table> table = metastore.getTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName());
Optional<Table> table = getHiveTable(session, schemaTableName);
if (table.isPresent() && isPrestoView(table.get())) {
verifyAndPopulateViews(table.get(), schemaTableName, decodeViewData(table.get().getViewOriginalText().get()), views);
}
Expand Down Expand Up @@ -527,7 +551,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
MetastoreContext metastoreContext = getMetastoreContext(session);
Table table = metastore.getTable(metastoreContext, icebergTableHandle.getSchemaTableName().getSchemaName(), icebergTableHandle.getSchemaTableName().getTableName())
Table table = getHiveTable(session, icebergTableHandle.getSchemaTableName())
.orElseThrow(() -> new TableNotFoundException(icebergTableHandle.getSchemaTableName()));

List<Column> partitionColumns = table.getPartitionColumns();
Expand Down
Loading