Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5/n][dagster-fivetran] Implement FivetranWorkspaceData to FivetranConnectorTableProps method #25797

Merged
merged 8 commits into from
Nov 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,15 @@ def _build_fivetran_assets(
table: AssetKey([*asset_key_prefix, *table.split(".")])
if not translator_instance or not connection_metadata
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
FivetranConnectorTableProps(
table=table,
connector_id=connection_metadata.connector_id,
name=connection_metadata.name,
connector_url=connection_metadata.connector_url,
schema_config=connection_metadata.schemas,
database=connection_metadata.database,
service=connection_metadata.service,
)
).key
for table in destination_tables
}
Expand Down Expand Up @@ -155,7 +163,15 @@ def _build_fivetran_assets(
)
if not translator_instance or not connection_metadata
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
FivetranConnectorTableProps(
table=table,
connector_id=connection_metadata.connector_id,
name=connection_metadata.name,
connector_url=connection_metadata.connector_url,
schema_config=connection_metadata.schemas,
database=connection_metadata.database,
service=connection_metadata.service,
)
)
for table in tracked_asset_keys.keys()
],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
from enum import Enum
from typing import Any, Mapping, NamedTuple, Optional, Sequence
from typing import Any, Dict, List, Mapping, NamedTuple, Optional, Sequence, cast

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method

from dagster_fivetran.utils import metadata_for_table
from dagster_fivetran.utils import (
get_fivetran_connector_table_name,
get_fivetran_connector_url,
metadata_for_table,
)


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
schema_config: Mapping[str, Any]
database: Optional[str]
service: Optional[str]

Expand Down Expand Up @@ -63,11 +68,51 @@ def from_content_data(
},
)

@cached_method
def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
"""Method that converts a `FivetranWorkspaceData` object
to a collection of `FivetranConnectorTableProps` objects.
"""
raise NotImplementedError()
data: List[FivetranConnectorTableProps] = []

for connector_id, connector_data in self.connectors_by_id.items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In airlift, from the raw API data we retrieved, we constructed a "lookup" object which built up the actual usable asset data from a set of cacheable properties. See

for an example of what I mean.

Since this is over the "cacheable" boundary, I'm wondering if there's a likelihood if it being called a bunch, and if so I think the cacheable structure likely makes sense. At the very least, I think that the conversion to FivetranConnectorTableProps should probably be cached, but it might also make sense to cache the stuff from 73-83 here as properties if there's potential for reuse (but if not, then maybe that's not worth the effort)

Copy link
Contributor Author

@maximearmstrong maximearmstrong Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. This method is meant to be used only in FivetranWorkspaceDefsLoader.defs_from_state, see implement in next PR #25807.

But I think caching it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cached the method in b272d07

connector_details = connector_data.properties
connector_name = connector_details["schema"]
connector_url = get_fivetran_connector_url(connector_details)

schema_config = connector_details["schema_config"]

destination_details = self.destinations_by_id[
connector_details["destination_id"]
].properties
database = destination_details.get("config", {}).get("database")
service = destination_details.get("service")

schemas_data = cast(Dict[str, Any], schema_config["schemas"])
for schema in schemas_data.values():
if schema["enabled"]:
schema_name = schema["name_in_destination"]
schema_tables: Dict[str, Dict[str, Any]] = cast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strong code smell from this one. The complexity from the previous PR feels like it's spilling over into this one with all the casting and raw string munging we need to do. Let's strongly type all of these objects, I think things will feel much better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This comment applies here as well - this is taken from the legacy code to replicate the behavior. It will be updated in the same PR refactoring how we are storing destinations and connectors.

Dict[str, Dict[str, Any]], schema["tables"]
)
for table in schema_tables.values():
if table["enabled"]:
table_name = table["name_in_destination"]
data.append(
FivetranConnectorTableProps(
table=get_fivetran_connector_table_name(
schema_name=schema_name, table_name=table_name
),
connector_id=connector_id,
name=connector_name,
connector_url=connector_url,
schema_config=schema_config,
database=database,
service=service,
)
)

return data


class DagsterFivetranTranslator:
Expand All @@ -80,7 +125,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
for schema in props.schema_config["schemas"].values()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the name change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reflect Fivetran's ontology, see here

if schema["name_in_destination"] == schema_name
)
table_entry = next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def get_fivetran_logs_url(connector_details: Mapping[str, Any]) -> str:
return f"{get_fivetran_connector_url(connector_details)}/logs"


def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str:
return f"{schema_name}.{table_name}"


def metadata_for_table(
table_data: Mapping[str, Any],
connector_url: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@
"enable_new_by_default": True,
"schemas": {
"property1": {
"name_in_destination": "schema_name_in_destination",
"name_in_destination": "schema_name_in_destination_1",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_1",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -244,7 +244,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -264,11 +264,11 @@
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_2",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -279,7 +279,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -300,16 +300,16 @@
},
},
"property2": {
"name_in_destination": "schema_name_in_destination",
"name_in_destination": "schema_name_in_destination_2",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_1",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -320,7 +320,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -340,11 +340,11 @@
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"name_in_destination": "table_name_in_destination_2",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -355,7 +355,7 @@
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand All @@ -380,6 +380,9 @@
},
}

TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"


@pytest.fixture(name="connector_id")
def connector_id_fixture() -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import uuid

import responses
from dagster_fivetran import FivetranWorkspace

from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET


def test_fetch_fivetran_workspace_data(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
api_key = uuid.uuid4().hex
api_secret = uuid.uuid4().hex

resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret)
resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET)

actual_workspace_data = resource.fetch_fivetran_workspace_data()
assert len(actual_workspace_data.connectors_by_id) == 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import uuid

import responses
from dagster_fivetran import FivetranWorkspace

from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET


def test_basic_resource_request(
connector_id: str,
destination_id: str,
group_id: str,
all_api_mocks: responses.RequestsMock,
) -> None:
api_key = uuid.uuid4().hex
api_secret = uuid.uuid4().hex

resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret)
resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET)

client = resource.get_client()
client.get_connector_details(connector_id=connector_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Callable

from dagster_fivetran import FivetranWorkspace

from dagster_fivetran_tests.experimental.conftest import TEST_API_KEY, TEST_API_SECRET


def test_fivetran_workspace_data_to_fivetran_connector_table_props_data(
fetch_workspace_data_api_mocks: Callable,
) -> None:
resource = FivetranWorkspace(api_key=TEST_API_KEY, api_secret=TEST_API_SECRET)

actual_workspace_data = resource.fetch_fivetran_workspace_data()
table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data()
assert len(table_props_data) == 4
assert table_props_data[0].table == "schema_name_in_destination_1.table_name_in_destination_1"
assert table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2"
assert table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1"
assert table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2"