diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index 0443de1bdc1e1..54658136990bf 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -123,7 +123,15 @@ def _build_fivetran_assets( table: AssetKey([*asset_key_prefix, *table.split(".")]) if not translator_instance or not connection_metadata else translator_instance.get_asset_key( - FivetranConnectorTableProps(table=table, **connection_metadata._asdict()) + FivetranConnectorTableProps( + table=table, + connector_id=connection_metadata.connector_id, + name=connection_metadata.name, + connector_url=connection_metadata.connector_url, + schema_config=connection_metadata.schemas, + database=connection_metadata.database, + service=connection_metadata.service, + ) ) for table in destination_tables } @@ -156,7 +164,15 @@ def _build_fivetran_assets( ) if not translator_instance or not connection_metadata else translator_instance.get_asset_spec( - FivetranConnectorTableProps(table=table, **connection_metadata._asdict()) + FivetranConnectorTableProps( + table=table, + connector_id=connection_metadata.connector_id, + name=connection_metadata.name, + connector_url=connection_metadata.connector_url, + schema_config=connection_metadata.schemas, + database=connection_metadata.database, + service=connection_metadata.service, + ) ) for table in tracked_asset_keys.keys() ], diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 1fb5f06af3289..b4cbb87915ace 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,12 +1,12 @@ from enum import Enum -from typing import Any, Mapping, NamedTuple, Optional, Sequence +from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Sequence, cast from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._record import record from dagster._serdes.serdes import whitelist_for_serdes -from dagster_fivetran.utils import metadata_for_table +from dagster_fivetran.utils import get_fivetran_connector_url, metadata_for_table class FivetranConnectorTableProps(NamedTuple): @@ -14,7 +14,7 @@ class FivetranConnectorTableProps(NamedTuple): connector_id: str name: str connector_url: str - schemas: Mapping[str, Any] + schema_config: Mapping[str, Any] database: Optional[str] service: Optional[str] @@ -67,7 +67,44 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa """Method that converts a `FivetranWorkspaceData` object to a collection of `FivetranConnectorTableProps` objects. """ - raise NotImplementedError() + data: List[FivetranConnectorTableProps] = [] + + for connector_id, connector_data in self.connectors_by_id.items(): + connector_details = connector_data.properties + connector_name = connector_details["schema"] + connector_url = get_fivetran_connector_url(connector_details) + + schema_config = connector_details["schema_config"] + + destination_details = self.destinations_by_id[ + connector_details["destination_id"] + ].properties + database = destination_details.get("config", {}).get("database") + service = destination_details.get("service") + + schemas_data = cast(Dict[str, Any], schema_config["schemas"]) + for schema in schemas_data.values(): + if schema["enabled"]: + schema_name = schema["name_in_destination"] + schema_tables: Dict[str, Dict[str, Any]] = cast( + Dict[str, Dict[str, Any]], schema["tables"] + ) + for table in schema_tables.values(): + if table["enabled"]: + table_name = table["name_in_destination"] + data.append( + FivetranConnectorTableProps( + table=f"{schema_name}.{table_name}", + connector_id=connector_id, + name=connector_name, + connector_url=connector_url, + schema_config=schema_config, + database=database, + service=service, + ) + ) + + return data class DagsterFivetranTranslator: @@ -84,7 +121,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: schema_name, table_name = props.table.split(".") schema_entry = next( schema - for schema in props.schemas["schemas"].values() + for schema in props.schema_config["schemas"].values() if schema["name_in_destination"] == schema_name ) table_entry = next( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 1b3ab546c353d..98cd783e474b6 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -215,16 +215,16 @@ "enable_new_by_default": True, "schemas": { "property1": { - "name_in_destination": "schema_name_in_destination", + "name_in_destination": "schema_name_in_destination_1", "enabled": True, "tables": { "property1": { "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination", + "name_in_destination": "table_name_in_destination_1", "enabled": True, "columns": { "property1": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_1", "enabled": True, "hashed": False, "enabled_patch_settings": { @@ -235,7 +235,7 @@ "is_primary_key": True, }, "property2": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_2", "enabled": True, "hashed": False, "enabled_patch_settings": { @@ -255,11 +255,11 @@ }, "property2": { "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination", + "name_in_destination": "table_name_in_destination_2", "enabled": True, "columns": { "property1": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_1", "enabled": True, "hashed": False, "enabled_patch_settings": { @@ -270,7 +270,7 @@ "is_primary_key": True, }, "property2": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_2", "enabled": True, "hashed": False, "enabled_patch_settings": { @@ -291,16 +291,16 @@ }, }, "property2": { - "name_in_destination": "schema_name_in_destination", + "name_in_destination": "schema_name_in_destination_2", "enabled": True, "tables": { "property1": { "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination", + "name_in_destination": "table_name_in_destination_1", "enabled": True, "columns": { "property1": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_1", "enabled": True, "hashed": False, "enabled_patch_settings": { @@ -311,7 +311,7 @@ "is_primary_key": True, }, "property2": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_1", "enabled": True, "hashed": False, "enabled_patch_settings": { @@ -331,11 +331,11 @@ }, "property2": { "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination", + "name_in_destination": "table_name_in_destination_2", "enabled": True, "columns": { "property1": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_1", "enabled": True, "hashed": False, "enabled_patch_settings": { @@ -346,7 +346,7 @@ "is_primary_key": True, }, "property2": { - "name_in_destination": "column_name_in_destination", + "name_in_destination": "column_name_in_destination_2", "enabled": True, "hashed": False, "enabled_patch_settings": { diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py new file mode 100644 index 0000000000000..3c35eb47def28 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py @@ -0,0 +1,30 @@ +import uuid +from typing import Callable + +from dagster_fivetran import FivetranWorkspace + + +def test_fivetran_workspace_data_to_fivetran_connector_table_props_data( + workspace_data_api_mocks_fn: Callable, +) -> None: + api_key = uuid.uuid4().hex + api_secret = uuid.uuid4().hex + + resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret) + + with workspace_data_api_mocks_fn(include_sync_endpoints=False): + actual_workspace_data = resource.fetch_fivetran_workspace_data() + table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data() + assert len(table_props_data) == 4 + assert ( + table_props_data[0].table == "schema_name_in_destination_1.table_name_in_destination_1" + ) + assert ( + table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2" + ) + assert ( + table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1" + ) + assert ( + table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2" + )