Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add destinations (statsd) #71

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions contessa/destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging
from typing import List, Text, Type

from datadog.dogstatsd.base import DEFAULT_HOST, DEFAULT_PORT

from contessa.db import Connector
from contessa.models import DQBase, QualityCheck
from datadog import DogStatsd


class Destination:
def __init__(self):
pass

def ensure_destination(self, dq_class: Type[DQBase]) -> None:
pass

def persist(self, items: List[DQBase]) -> None:
pass


class DBDestination(Destination):
def __init__(self, conn_uri_or_engine: Text):
super().__init__()
self.conn_uri_or_engine = conn_uri_or_engine
self.conn = Connector(conn_uri_or_engine)

def ensure_destination(self, quality_check_class: Type[DQBase]) -> None:
self.conn.ensure_table(quality_check_class.__table__)

def persist(self, items: List[DQBase]) -> None:
self.conn.upsert(items)


class StatsDDestination(Destination):
def __init__(
self,
statsd_prefix: Text,
statsd_host: Text = DEFAULT_HOST,
statsd_port: int = DEFAULT_PORT,
):
super().__init__()
self.statsd = DogStatsd(host=statsd_host, port=statsd_port)
self.statsd_prefix = statsd_prefix.rstrip(".")

def ensure_destination(self, quality_check_class: Type[DQBase]) -> None:
pass

def persist(self, items: List[DQBase]) -> None:
for item in items:
if isinstance(item, QualityCheck):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a comment that we do not support ConsistencyCheck or write also the ConsistencyCheck sender?

self.persist_quality_check(item)
else:
logging.warning(
f"Not persisted. Reason: no handler for {type(item)} type. Skipping..."
)

def persist_quality_check(self, item: QualityCheck) -> None:
tags = [
f"rule_name:{item.rule_name}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not enough for uniqueness. you need to include also table_name, schema_name (or combined) and db_name/db_host.

as you can use contessa with same DD for multiple DBs, tables and schemas.

f"rule_type:{item.rule_type}",
f"attribute:{item.attribute}",
]

self.statsd.increment(
f"{self.statsd_prefix}.total_records", item.total_records, tags=tags,
)

self.statsd.increment(
f"{self.statsd_prefix}.failed_records", item.failed, tags=tags,
)

self.statsd.increment(
f"{self.statsd_prefix}.passed_records", item.passed, tags=tags,
)

self.statsd.gauge(
f"{self.statsd_prefix}.failed_records_percentage",
item.failed_percentage,
tags=tags,
)

self.statsd.gauge(
f"{self.statsd_prefix}.passed_records_percentage",
item.passed_percentage,
tags=tags,
)
8 changes: 7 additions & 1 deletion contessa/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict

import pandas as pd
from pybigquery.sqlalchemy_bigquery import BigQueryDialect

from contessa.db import Connector
from contessa.models import Table
Expand Down Expand Up @@ -79,8 +80,13 @@ def compose_where_time_filter(self, rule):
past = (
self.context["task_ts"] - timedelta(days=each["days"])
).strftime("%Y-%m-%d %H:%M:%S UTC")
timestamp_transformer = (
"::timestamptz"
if not isinstance(self.conn.engine.dialect, BigQueryDialect)
else ""
)
result.append(
f"""{each["column"]} BETWEEN '{past}'::timestamptz AND '{present}'::timestamptz"""
f"""{each["column"]} BETWEEN '{past}'{timestamp_transformer} AND '{present}'{timestamp_transformer}"""
)

return " AND ".join(result)
Expand Down
17 changes: 15 additions & 2 deletions contessa/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from datetime import datetime

from contessa.destination import Destination, DBDestination
from contessa.base_rules import Rule
from contessa.db import Connector
from contessa.executor import get_executor, refresh_executors
Expand All @@ -27,26 +28,38 @@ def run(
check_table: Dict,
result_table: Dict, # todo - docs for quality name, maybe defaults..
context: Optional[Dict] = None,
destinations: List[Destination] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure about this. If we allow multiple destination, we should ensure consistency. Meaning, if DD send fail, we should rollback DB transaction and vice versa. Which requires more code to handle it.

As I think about this, I would for now allow only 1 destination. Just DD or DB. We(kiwi) will mostly use DB and if we would need, we can just run the same code with DD destination or to have a subscriber on DB level that will send it to DD (which is imho better idea in case you are using DB as destination)

What do you think?

):
check_table = Table(**check_table)
result_table = ResultTable(**result_table, model_cls=self.model_cls)
context = self.get_context(check_table, context)
destinations = (
destinations
if destinations is not None
else [DBDestination(self.conn_uri_or_engine)]
)

normalized_rules = self.normalize_rules(raw_rules)
refresh_executors(check_table, self.conn, context)
quality_check_class = self.get_quality_check_class(result_table)
self.conn.ensure_table(quality_check_class.__table__)

_ = [
destination.ensure_destination(quality_check_class)
for destination in destinations
]

rules = self.build_rules(normalized_rules)
objs = self.do_quality_checks(quality_check_class, rules, context)

self.conn.upsert(objs)
_ = [destination.persist(objs) for destination in destinations]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to have list comprehension? can we just write normal for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@staticmethod
def get_context(check_table: Table, context: Optional[Dict] = None) -> Dict:
"""
Construct context to pass to executors. User context overrides defaults.
"""
if context is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx 👍

context = {}
ctx_defaults = {
"table_fullname": check_table.fullname,
"task_ts": datetime.now(), # todo - is now() ok ?
Expand Down
3 changes: 2 additions & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
datadog
sqlalchemy>=1.2
psycopg2-binary>=2.7
pandas
Jinja2
pybigquery
pybigquery==0.4.13
alembic
click
packaging
12 changes: 8 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#
# This file is autogenerated by pip-compile
# To update, run:
#
# pip-compile --output-file=requirements.txt requirements.in
Expand All @@ -7,6 +9,8 @@ cachetools==3.1.1 # via google-auth
certifi==2019.9.11 # via requests
chardet==3.0.4 # via requests
click==7.0
datadog==0.34.0
decorator==4.4.1 # via datadog
future==0.18.1 # via pybigquery
google-api-core==1.14.2 # via google-cloud-core
google-auth==1.6.3 # via google-api-core
Expand All @@ -25,16 +29,16 @@ protobuf==3.10.0 # via google-api-core, google-cloud-bigquery, googleap
psycopg2-binary==2.8.3
pyasn1-modules==0.2.6 # via google-auth
pyasn1==0.4.7 # via pyasn1-modules, rsa
pybigquery==0.4.11
pybigquery==0.4.13
pyparsing==2.4.5 # via packaging
python-dateutil==2.8.0 # via alembic, pandas
python-dateutil==2.8.0 # via pandas
python-editor==1.0.4 # via alembic
pytz==2019.2 # via google-api-core, pandas
requests==2.22.0 # via google-api-core
requests==2.22.0 # via datadog, google-api-core
rsa==4.0 # via google-auth
six==1.12.0 # via google-api-core, google-auth, google-resumable-media, packaging, protobuf, python-dateutil
sqlalchemy==1.3.6
urllib3==1.25.6 # via requests

# The following packages are considered to be unsafe in a requirements file:
# setuptools==41.6.0 # via google-api-core, protobuf
# setuptools==45.2.0 # via google-api-core, protobuf