Skip to content

Commit

Permalink
[5/n][dagster-tableau] Implement FivetranWorkspaceData to FivetranCon…
Browse files Browse the repository at this point in the history
…nectorTableProps method
  • Loading branch information
maximearmstrong committed Nov 8, 2024
1 parent 252be0c commit d5a03b3
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,15 @@ def _build_fivetran_assets(
table: AssetKey([*asset_key_prefix, *table.split(".")])
if not translator_instance or not connection_metadata
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
FivetranConnectorTableProps(
table=table,
connector_id=connection_metadata.connector_id,
name=connection_metadata.name,
connector_url=connection_metadata.connector_url,
schema_config=connection_metadata.schemas,
database=connection_metadata.database,
service=connection_metadata.service,
)
).key
for table in destination_tables
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from enum import Enum
from typing import Any, Mapping, NamedTuple, Optional, Sequence
from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Sequence, cast

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes

from dagster_fivetran.utils import metadata_for_table
from dagster_fivetran.utils import get_fivetran_connector_url, metadata_for_table


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
schema_config: Mapping[str, Any]
database: Optional[str]
service: Optional[str]

Expand Down Expand Up @@ -67,7 +67,44 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa
"""Method that converts a `FivetranWorkspaceData` object
to a collection of `FivetranConnectorTableProps` objects.
"""
raise NotImplementedError()
data: List[FivetranConnectorTableProps] = []

for connector_id, connector_data in self.connectors_by_id.items():
connector_details = connector_data.properties
connector_name = connector_details["schema"]
connector_url = get_fivetran_connector_url(connector_details)

schema_config = connector_details["schema_config"]

destination_details = self.destinations_by_id[
connector_details["destination_id"]
].properties
database = destination_details.get("config", {}).get("database")
service = destination_details.get("service")

schemas_data = cast(Dict[str, Any], schema_config["schemas"])
for schema in schemas_data.values():
if schema["enabled"]:
schema_name = schema["name_in_destination"]
schema_tables: Dict[str, Dict[str, Any]] = cast(
Dict[str, Dict[str, Any]], schema["tables"]
)
for table in schema_tables.values():
if table["enabled"]:
table_name = table["name_in_destination"]
data.append(
FivetranConnectorTableProps(
table=f"{schema_name}.{table_name}",
connector_id=connector_id,
name=connector_name,
connector_url=connector_url,
schema_config=schema_config,
database=database,
service=service,
)
)

return data


class DagsterFivetranTranslator:
Expand All @@ -80,7 +117,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
for schema in props.schema_config["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,16 @@
"enable_new_by_default": True,
"schemas": {
"property1": {
"name_in_destination": "schema_name_in_destination",
"name_in_destination": "schema_name_in_destination_1",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_1",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -235,7 +235,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -255,11 +255,11 @@
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_2",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -270,7 +270,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -291,16 +291,16 @@
},
},
"property2": {
"name_in_destination": "schema_name_in_destination",
"name_in_destination": "schema_name_in_destination_2",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_1",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -311,7 +311,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -331,11 +331,11 @@
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_2",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -346,7 +346,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import uuid
from typing import Callable

from dagster_fivetran import FivetranWorkspace


def test_fivetran_workspace_data_to_fivetran_connector_table_props_data(
workspace_data_api_mocks_fn: Callable,
) -> None:
api_key = uuid.uuid4().hex
api_secret = uuid.uuid4().hex

resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret)

with workspace_data_api_mocks_fn(include_sync_endpoints=False):
actual_workspace_data = resource.fetch_fivetran_workspace_data()
table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data()
assert len(table_props_data) == 4
assert (
table_props_data[0].table == "schema_name_in_destination_1.table_name_in_destination_1"
)
assert (
table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2"
)
assert (
table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1"
)
assert (
table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2"
)

0 comments on commit d5a03b3

Please sign in to comment.