Skip to content

Commit 37fa2f1

Browse files
committed
Fix spurious warnings and bogus index when reflecting Iceberg tables
1 parent 24cc388 commit 37fa2f1

File tree

1 file changed

+40
-2
lines changed

1 file changed

+40
-2
lines changed

trino/sqlalchemy/dialect.py

+40-2
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def _get_partitions(
218218
connection: Connection,
219219
table_name: str,
220220
schema: str = None
221-
) -> List[Dict[str, List[Any]]]:
221+
) -> List[str] | None:
222222
schema = schema or self._get_default_schema_name(connection)
223223
query = dedent(
224224
f"""
@@ -227,8 +227,46 @@ def _get_partitions(
227227
).strip()
228228
res = connection.execute(sql.text(query))
229229
partition_names = [desc[0] for desc in res.cursor.description]
230+
data_types = [desc[1] for desc in res.cursor.description]
231+
# Compare the column names and types to the shape of an Iceberg $partitions table
232+
if (partition_names == ['partition', 'record_count', 'file_count', 'total_size', 'data']
233+
and data_types[0].startswith('row(')
234+
and data_types[1] == 'bigint'
235+
and data_types[2] == 'bigint'
236+
and data_types[3] == 'bigint'
237+
and data_types[4].startswith('row(')):
238+
# This is an Iceberg $partitions table - these match the partition metadata columns
239+
return None
240+
# This is a Hive table - these are the partition names
230241
return partition_names
231242

243+
def _has_connector_name(self, connection: Connection):
244+
query = dedent(
245+
"""
246+
SELECT
247+
COUNT(*)
248+
FROM "system"."information_schema"."columns"
249+
WHERE "table_catalog" = 'system'
250+
AND "table_schema" = 'metadata'
251+
AND "table_name" = 'catalogs'
252+
AND "column_name" = 'connector_name'
253+
"""
254+
).strip()
255+
res = connection.execute(sql.text(query))
256+
return res.scalar() == 1
257+
258+
def _get_connector_name(self, connection: Connection, catalog_name: str):
259+
query = dedent(
260+
"""
261+
SELECT
262+
"connector_name"
263+
FROM "system"."metadata"."catalogs"
264+
WHERE "catalog_name" = :catalog_name
265+
"""
266+
).strip()
267+
res = connection.execute(sql.text(query), {"catalog_name": catalog_name})
268+
return res.scalar()
269+
232270
def get_pk_constraint(self, connection: Connection, table_name: str, schema: str = None, **kw) -> Dict[str, Any]:
233271
"""Trino has no support for primary keys. Returns a dummy"""
234272
return dict(name=None, constrained_columns=[])
@@ -326,7 +364,7 @@ def get_indexes(self, connection: Connection, table_name: str, schema: str = Non
326364
try:
327365
partitioned_columns = self._get_partitions(connection, f"{table_name}", schema)
328366
except Exception as e:
329-
# e.g. it's not a Hive table or an unpartitioned Hive table
367+
# e.g. it's an unpartitioned Hive table
330368
logger.debug("Couldn't fetch partition columns. schema: %s, table: %s, error: %s", schema, table_name, e)
331369
if not partitioned_columns:
332370
return []

0 commit comments

Comments
 (0)