Skip to content

Commit

Permalink
move connector status logic to FivetranConnector
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 13, 2024
1 parent a1cea87 commit 6e42757
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import os
import time
from enum import Enum
from typing import Any, Mapping, Optional, Sequence, Tuple, Type
from urllib.parse import urljoin

Expand Down Expand Up @@ -51,14 +50,6 @@
FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata"


class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

INCOMPLETE = "incomplete"
CONNECTED = "connected"
BROKEN = "broken"


class FivetranResource(ConfigurableResource):
"""This class exposes methods on top of the Fivetran REST API."""

Expand Down Expand Up @@ -651,31 +642,24 @@ def fetch_fivetran_workspace_data(
group_id = group["id"]

destination_details = client.get_destination_details(destination_id=group_id)
destination_id = destination_details["id"]

destinations_by_id[destination_id] = FivetranDestination.from_destination_details(
destination = FivetranDestination.from_destination_details(
destination_details=destination_details
)
destinations_by_id[destination.id] = destination

connectors_details = client.get_connectors_for_group(group_id=group_id)["items"]
for connector_details in connectors_details:
if connector_details["status"]["setup_state"] in (
FivetranConnectorSetupStateType.INCOMPLETE,
FivetranConnectorSetupStateType.BROKEN,
):
continue

connector_id = connector_details["id"]
schema_config_details = client.get_schema_config_for_connector(
connector_id=connector_id
)

connectors_by_id[connector_id] = FivetranConnector.from_api_details(
connector = FivetranConnector.from_api_details(
connector_details=connector_details,
destination_details=destination_details,
schema_config_details=schema_config_details,
schema_config_details=client.get_schema_config_for_connector(
connector_id=connector_details["id"]
),
)

if not connector.has_bad_status:
connectors_by_id[connector.id] = connector

return FivetranWorkspaceData(
connectors_by_id=connectors_by_id, destinations_by_id=destinations_by_id
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence

from dagster._core.definitions.asset_key import AssetKey
Expand All @@ -19,6 +20,14 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

INCOMPLETE = "incomplete"
CONNECTED = "connected"
BROKEN = "broken"


@whitelist_for_serdes
@record
class FivetranConnector:
Expand All @@ -29,6 +38,7 @@ class FivetranConnector:
service: str
schema_config: "FivetranSchemaConfig"
destination_id: str
has_bad_status: bool

@property
def url(self) -> str:
Expand All @@ -49,6 +59,11 @@ def from_api_details(
schema_config_details=schema_config_details
),
destination_id=destination_details["id"],
has_bad_status=connector_details["status"]["setup_state"]
in (
FivetranConnectorSetupStateType.INCOMPLETE,
FivetranConnectorSetupStateType.BROKEN,
),
)


Expand Down

0 comments on commit 6e42757

Please sign in to comment.