-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add destinations (statsd) #71
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import logging | ||
from typing import List, Text, Type | ||
|
||
from datadog.dogstatsd.base import DEFAULT_HOST, DEFAULT_PORT | ||
|
||
from contessa.db import Connector | ||
from contessa.models import DQBase, QualityCheck | ||
from datadog import DogStatsd | ||
|
||
|
||
class Destination: | ||
def __init__(self): | ||
pass | ||
|
||
def ensure_destination(self, dq_class: Type[DQBase]) -> None: | ||
pass | ||
|
||
def persist(self, items: List[DQBase]) -> None: | ||
pass | ||
|
||
|
||
class DBDestination(Destination): | ||
def __init__(self, conn_uri_or_engine: Text): | ||
super().__init__() | ||
self.conn_uri_or_engine = conn_uri_or_engine | ||
self.conn = Connector(conn_uri_or_engine) | ||
|
||
def ensure_destination(self, quality_check_class: Type[DQBase]) -> None: | ||
self.conn.ensure_table(quality_check_class.__table__) | ||
|
||
def persist(self, items: List[DQBase]) -> None: | ||
self.conn.upsert(items) | ||
|
||
|
||
class StatsDDestination(Destination): | ||
def __init__( | ||
self, | ||
statsd_prefix: Text, | ||
statsd_host: Text = DEFAULT_HOST, | ||
statsd_port: int = DEFAULT_PORT, | ||
): | ||
super().__init__() | ||
self.statsd = DogStatsd(host=statsd_host, port=statsd_port) | ||
self.statsd_prefix = statsd_prefix.rstrip(".") | ||
|
||
def ensure_destination(self, quality_check_class: Type[DQBase]) -> None: | ||
pass | ||
|
||
def persist(self, items: List[DQBase]) -> None: | ||
for item in items: | ||
if isinstance(item, QualityCheck): | ||
self.persist_quality_check(item) | ||
else: | ||
logging.warning( | ||
f"Not persisted. Reason: no handler for {type(item)} type. Skipping..." | ||
) | ||
|
||
def persist_quality_check(self, item: QualityCheck) -> None: | ||
tags = [ | ||
f"rule_name:{item.rule_name}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not enough for uniqueness. you need to include also table_name, schema_name (or combined) and db_name/db_host. as you can use contessa with same DD for multiple DBs, tables and schemas. |
||
f"rule_type:{item.rule_type}", | ||
f"attribute:{item.attribute}", | ||
] | ||
|
||
self.statsd.increment( | ||
f"{self.statsd_prefix}.total_records", item.total_records, tags=tags, | ||
) | ||
|
||
self.statsd.increment( | ||
f"{self.statsd_prefix}.failed_records", item.failed, tags=tags, | ||
) | ||
|
||
self.statsd.increment( | ||
f"{self.statsd_prefix}.passed_records", item.passed, tags=tags, | ||
) | ||
|
||
self.statsd.gauge( | ||
f"{self.statsd_prefix}.failed_records_percentage", | ||
item.failed_percentage, | ||
tags=tags, | ||
) | ||
|
||
self.statsd.gauge( | ||
f"{self.statsd_prefix}.passed_records_percentage", | ||
item.passed_percentage, | ||
tags=tags, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
|
||
from datetime import datetime | ||
|
||
from contessa.destination import Destination, DBDestination | ||
from contessa.base_rules import Rule | ||
from contessa.db import Connector | ||
from contessa.executor import get_executor, refresh_executors | ||
|
@@ -27,26 +28,38 @@ def run( | |
check_table: Dict, | ||
result_table: Dict, # todo - docs for quality name, maybe defaults.. | ||
context: Optional[Dict] = None, | ||
destinations: List[Destination] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Im not sure about this. If we allow multiple destination, we should ensure consistency. Meaning, if DD send fail, we should rollback DB transaction and vice versa. Which requires more code to handle it. As I think about this, I would for now allow only 1 destination. Just DD or DB. We(kiwi) will mostly use DB and if we would need, we can just run the same code with DD destination or to have a subscriber on DB level that will send it to DD (which is imho better idea in case you are using DB as destination) What do you think? |
||
): | ||
check_table = Table(**check_table) | ||
result_table = ResultTable(**result_table, model_cls=self.model_cls) | ||
context = self.get_context(check_table, context) | ||
destinations = ( | ||
destinations | ||
if destinations is not None | ||
else [DBDestination(self.conn_uri_or_engine)] | ||
) | ||
|
||
normalized_rules = self.normalize_rules(raw_rules) | ||
refresh_executors(check_table, self.conn, context) | ||
quality_check_class = self.get_quality_check_class(result_table) | ||
self.conn.ensure_table(quality_check_class.__table__) | ||
|
||
_ = [ | ||
destination.ensure_destination(quality_check_class) | ||
for destination in destinations | ||
] | ||
|
||
rules = self.build_rules(normalized_rules) | ||
objs = self.do_quality_checks(quality_check_class, rules, context) | ||
|
||
self.conn.upsert(objs) | ||
_ = [destination.persist(objs) for destination in destinations] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a reason to have list comprehension? can we just write normal There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
@staticmethod | ||
def get_context(check_table: Table, context: Optional[Dict] = None) -> Dict: | ||
""" | ||
Construct context to pass to executors. User context overrides defaults. | ||
""" | ||
if context is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx 👍 |
||
context = {} | ||
ctx_defaults = { | ||
"table_fullname": check_table.fullname, | ||
"task_ts": datetime.now(), # todo - is now() ok ? | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,9 @@ | ||
datadog | ||
sqlalchemy>=1.2 | ||
psycopg2-binary>=2.7 | ||
pandas | ||
Jinja2 | ||
pybigquery | ||
pybigquery==0.4.13 | ||
alembic | ||
click | ||
packaging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write a comment that we do not support
ConsistencyCheck
or write also theConsistencyCheck
sender?