Skip to content

Commit

Permalink
[iceberg] Reduce redundant getTable calls in IcebergHiveMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
imjalpreet committed Jan 18, 2025
1 parent 1f3425b commit 70fe67e
Showing 1 changed file with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,13 @@
import com.facebook.presto.spi.statistics.TableStatisticType;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.MetricsConfig;
Expand All @@ -85,6 +88,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;

import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics;
Expand Down Expand Up @@ -132,6 +136,7 @@
import static com.facebook.presto.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -148,10 +153,13 @@
public class IcebergHiveMetadata
extends IcebergAbstractMetadata
{
public static final int MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE = 1000;

private final ExtendedHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final DateTimeZone timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(TimeZone.getDefault().getID())));
private final IcebergHiveTableOperationsConfig hiveTableOeprationsConfig;
private final Cache<SchemaTableName, Optional<Table>> tableCache;

public IcebergHiveMetadata(
ExtendedHiveMetastore metastore,
Expand All @@ -169,6 +177,7 @@ public IcebergHiveMetadata(
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hiveTableOeprationsConfig = requireNonNull(hiveTableOeprationsConfig, "hiveTableOperationsConfig is null");
this.tableCache = CacheBuilder.newBuilder().maximumSize(MAXIMUM_PER_QUERY_TABLE_CACHE_SIZE).build();
}

public ExtendedHiveMetastore getMetastore()
Expand All @@ -191,8 +200,7 @@ protected View getIcebergView(ConnectorSession session, SchemaTableName schemaTa
@Override
protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTableName)
{
IcebergTableName name = IcebergTableName.from(schemaTableName.getTableName());
Optional<Table> hiveTable = metastore.getTable(getMetastoreContext(session), schemaTableName.getSchemaName(), name.getTableName());
Optional<Table> hiveTable = getHiveTable(session, schemaTableName);
if (!hiveTable.isPresent()) {
return false;
}
Expand All @@ -202,6 +210,22 @@ protected boolean tableExists(ConnectorSession session, SchemaTableName schemaTa
return true;
}

private Optional<Table> getHiveTable(ConnectorSession session, SchemaTableName schemaTableName)
{
IcebergTableName name = IcebergTableName.from(schemaTableName.getTableName());
try {
return tableCache.get(schemaTableName, () ->
metastore.getTable(getMetastoreContext(session), schemaTableName.getSchemaName(), name.getTableName()));
}
catch (UncheckedExecutionException e) {
throwIfInstanceOf(e.getCause(), PrestoException.class);
throw e;
}
catch (ExecutionException e) {
throw new RuntimeException("Unexpected checked exception by cache load from metastore", e);
}
}

@Override
public List<String> listSchemaNames(ConnectorSession session)
{
Expand Down Expand Up @@ -369,7 +393,7 @@ public void createView(ConnectorSession session, ConnectorTableMetadata viewMeta
encodeViewData(viewData));
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(session.getUser());

Optional<Table> existing = metastore.getTable(metastoreContext, viewName.getSchemaName(), viewName.getTableName());
Optional<Table> existing = getHiveTable(session, viewName);
if (existing.isPresent()) {
if (!replace || !isPrestoView(existing.get())) {
throw new ViewAlreadyExistsException(viewName);
Expand Down Expand Up @@ -413,7 +437,7 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
}
MetastoreContext metastoreContext = getMetastoreContext(session);
for (SchemaTableName schemaTableName : tableNames) {
Optional<Table> table = metastore.getTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName());
Optional<Table> table = getHiveTable(session, schemaTableName);
if (table.isPresent() && isPrestoView(table.get())) {
verifyAndPopulateViews(table.get(), schemaTableName, decodeViewData(table.get().getViewOriginalText().get()), views);
}
Expand Down Expand Up @@ -527,7 +551,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
MetastoreContext metastoreContext = getMetastoreContext(session);
Table table = metastore.getTable(metastoreContext, icebergTableHandle.getSchemaTableName().getSchemaName(), icebergTableHandle.getSchemaTableName().getTableName())
Table table = getHiveTable(session, icebergTableHandle.getSchemaTableName())
.orElseThrow(() -> new TableNotFoundException(icebergTableHandle.getSchemaTableName()));

List<Column> partitionColumns = table.getPartitionColumns();
Expand Down

0 comments on commit 70fe67e

Please sign in to comment.