Skip to content

Commit

Permalink
[5/n][dagster-fivetran] Implement FivetranWorkspaceData to `Fivetra…
Browse files Browse the repository at this point in the history
…nConnectorTableProps` method (#25797)

## Summary & Motivation

This PR implements
`FivetranWorkspaceData.to_fivetran_connector_table_props_data()`, a
method that converts a `FivetranWorkspaceData` object to a list of
FivetranConnectorTableProps.

To create the asset spec, we need one `FivetranConnectorTableProps`
object per connector table. This method parses the API raw data to
create the FivetranConnectorTableProps` object, which is compatible with
the `DagsterFivetranTranslator`.

This will be used in the `defs_from_state` method of the stated-backed
defs loader in a subsequent PR.

## How I Tested These Changes

Additional unit test
  • Loading branch information
maximearmstrong authored Nov 12, 2024
1 parent 6e9d452 commit d6f4f52
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,15 @@ def _build_fivetran_assets(
table: AssetKey([*asset_key_prefix, *table.split(".")])
if not translator_instance or not connection_metadata
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
FivetranConnectorTableProps(
table=table,
connector_id=connection_metadata.connector_id,
name=connection_metadata.name,
connector_url=connection_metadata.connector_url,
schema_config=connection_metadata.schemas,
database=connection_metadata.database,
service=connection_metadata.service,
)
).key
for table in destination_tables
}
Expand Down Expand Up @@ -155,7 +163,15 @@ def _build_fivetran_assets(
)
if not translator_instance or not connection_metadata
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
FivetranConnectorTableProps(
table=table,
connector_id=connection_metadata.connector_id,
name=connection_metadata.name,
connector_url=connection_metadata.connector_url,
schema_config=connection_metadata.schemas,
database=connection_metadata.database,
service=connection_metadata.service,
)
)
for table in tracked_asset_keys.keys()
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
from enum import Enum
from typing import Any, Mapping, NamedTuple, Optional, Sequence
from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Sequence, cast

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method

from dagster_fivetran.utils import metadata_for_table
from dagster_fivetran.utils import (
get_fivetran_connector_table_name,
get_fivetran_connector_url,
metadata_for_table,
)


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
schema_config: Mapping[str, Any]
database: Optional[str]
service: Optional[str]

Expand Down Expand Up @@ -63,11 +68,51 @@ def from_content_data(
},
)

@cached_method
def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
"""Method that converts a `FivetranWorkspaceData` object
to a collection of `FivetranConnectorTableProps` objects.
"""
raise NotImplementedError()
data: List[FivetranConnectorTableProps] = []

for connector_id, connector_data in self.connectors_by_id.items():
connector_details = connector_data.properties
connector_name = connector_details["schema"]
connector_url = get_fivetran_connector_url(connector_details)

schema_config = connector_details["schema_config"]

destination_details = self.destinations_by_id[
connector_details["destination_id"]
].properties
database = destination_details.get("config", {}).get("database")
service = destination_details.get("service")

schemas_data = cast(Dict[str, Any], schema_config["schemas"])
for schema in schemas_data.values():
if schema["enabled"]:
schema_name = schema["name_in_destination"]
schema_tables: Dict[str, Dict[str, Any]] = cast(
Dict[str, Dict[str, Any]], schema["tables"]
)
for table in schema_tables.values():
if table["enabled"]:
table_name = table["name_in_destination"]
data.append(
FivetranConnectorTableProps(
table=get_fivetran_connector_table_name(
schema_name=schema_name, table_name=table_name
),
connector_id=connector_id,
name=connector_name,
connector_url=connector_url,
schema_config=schema_config,
database=database,
service=service,
)
)

return data


class DagsterFivetranTranslator:
Expand All @@ -80,7 +125,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
for schema in props.schema_config["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def get_fivetran_logs_url(connector_details: Mapping[str, Any]) -> str:
return f"{get_fivetran_connector_url(connector_details)}/logs"


def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str:
return f"{schema_name}.{table_name}"


def metadata_for_table(
table_data: Mapping[str, Any],
connector_url: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@
"enable_new_by_default": True,
"schemas": {
"property1": {
"name_in_destination": "schema_name_in_destination",
"name_in_destination": "schema_name_in_destination_1",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_1",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -244,7 +244,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -264,11 +264,11 @@
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_2",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -279,7 +279,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -300,16 +300,16 @@
},
},
"property2": {
"name_in_destination": "schema_name_in_destination",
"name_in_destination": "schema_name_in_destination_2",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_1",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -320,7 +320,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -340,11 +340,11 @@
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_2",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -355,7 +355,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -380,6 +380,9 @@
},
}

TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"


@pytest.fixture(name="connector_id")
def connector_id_fixture() -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import uuid

import responses
from dagster_fivetran import FivetranWorkspace

from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET


def test_fetch_fivetran_workspace_data(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
api_key = uuid.uuid4().hex
api_secret = uuid.uuid4().hex

resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret)
resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET)

actual_workspace_data = resource.fetch_fivetran_workspace_data()
assert len(actual_workspace_data.connectors_by_id) == 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import uuid

import responses
from dagster_fivetran import FivetranWorkspace

from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET


def test_basic_resource_request(
connector_id: str,
destination_id: str,
group_id: str,
all_api_mocks: responses.RequestsMock,
) -> None:
api_key = uuid.uuid4().hex
api_secret = uuid.uuid4().hex

resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret)
resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET)

client = resource.get_client()
client.get_connector_details(connector_id=connector_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Callable

from dagster_fivetran import FivetranWorkspace

from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET


def test_fivetran_workspace_data_to_fivetran_connector_table_props_data(
fetch_workspace_data_api_mocks: Callable,
) -> None:
resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET)

actual_workspace_data = resource.fetch_fivetran_workspace_data()
table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data()
assert len(table_props_data) == 4
assert table_props_data[0].table == "schema_name_in_destination_1.table_name_in_destination_1"
assert table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2"
assert table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1"
assert table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2"

0 comments on commit d6f4f52

Please sign in to comment.