Skip to content

Commit

Permalink
fix filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Nov 13, 2024
1 parent 1156ecb commit 8258e2a
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ def try_except_http_warn(self, should_catch: bool, msg: str) -> Iterator[None]:
raise

@cached_method
async def _fetch_dataset_upstreams_by_inode(self) -> Mapping[str, AbstractSet[str]]:
async def _fetch_dataset_upstreams_by_inode(
self, sigma_filter: SigmaFilter
) -> Mapping[str, AbstractSet[str]]:
"""Builds a mapping of dataset inodes to the upstream inputs they depend on.
Sigma does not expose this information directly, so we have to infer it from
the lineage of workbooks and the workbook queries.
Expand Down Expand Up @@ -309,12 +311,28 @@ async def build_deps_from_element(element: Dict[str, Any]) -> None:
]
)

await asyncio.gather(*[process_workbook(workbook) for workbook in raw_workbooks])
workbooks_to_fetch = []
if sigma_filter.workbook_folders:
workbook_filter_strings = [
"/".join(folder).lower() for folder in sigma_filter.workbook_folders
]
for workbook in raw_workbooks:
workbook_path = str(workbook["path"]).lower()
if any(
workbook_path.startswith(folder_str) for folder_str in workbook_filter_strings
):
workbooks_to_fetch.append(workbook)
else:
workbooks_to_fetch = raw_workbooks

await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks_to_fetch])

return deps_by_dataset_inode

@cached_method
async def _fetch_dataset_columns_by_inode(self) -> Mapping[str, AbstractSet[str]]:
async def _fetch_dataset_columns_by_inode(
self, sigma_filter: SigmaFilter
) -> Mapping[str, AbstractSet[str]]:
"""Builds a mapping of dataset inodes to the columns they contain. Note that
this is a partial list and will only include columns which are referenced in
workbooks, since Sigma does not expose a direct API for querying dataset columns.
Expand Down Expand Up @@ -354,7 +372,21 @@ async def process_workbook(workbook: Dict[str, Any]) -> None:
inode, column_name = split
columns_by_dataset_inode[inode].add(column_name)

await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks])
workbooks_to_fetch = []
if sigma_filter.workbook_folders:
workbook_filter_strings = [
"/".join(folder).lower() for folder in sigma_filter.workbook_folders
]
for workbook in workbooks:
workbook_path = str(workbook["path"]).lower()
if any(
workbook_path.startswith(folder_str) for folder_str in workbook_filter_strings
):
workbooks_to_fetch.append(workbook)
else:
workbooks_to_fetch = workbooks

await asyncio.gather(*[process_workbook(workbook) for workbook in workbooks_to_fetch])

return columns_by_dataset_inode

Expand Down Expand Up @@ -443,10 +475,10 @@ async def build_organization_data(
)

datasets: List[SigmaDataset] = []
deps_by_dataset_inode = await self._fetch_dataset_upstreams_by_inode()
deps_by_dataset_inode = await self._fetch_dataset_upstreams_by_inode(_sigma_filter)

columns_by_dataset_inode = (
await self._fetch_dataset_columns_by_inode() if fetch_column_data else {}
await self._fetch_dataset_columns_by_inode(_sigma_filter) if fetch_column_data else {}
)

logger.info("Fetching dataset data")
Expand Down

0 comments on commit 8258e2a

Please sign in to comment.