-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add destinations (statsd) #71
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like that this bq library you use doesn’t have some features like overriding project_id
does it block you for using it? is there any solution you see (havent checked myself)
also there is some hard coded postgres specific SQL code
only the timestamp or are there more ?
also # of days is hardcoded for time filter - 30 days
we just kept it there to be backward compatible. you can use List (with 1 object) to adjust number of days - https://contessa.readthedocs.io/en/latest/features.html#time-filter (sry there could be some comment probably..)
|
||
def persist(self, items: List[DQBase]) -> None: | ||
for item in items: | ||
if isinstance(item, QualityCheck): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write a comment that we do not support ConsistencyCheck
or write also the ConsistencyCheck
sender?
|
||
def persist_quality_check(self, item: QualityCheck) -> None: | ||
tags = [ | ||
f"rule_name:{item.rule_name}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not enough for uniqueness. you need to include also table_name, schema_name (or combined) and db_name/db_host.
as you can use contessa with same DD for multiple DBs, tables and schemas.
|
||
rules = self.build_rules(normalized_rules) | ||
objs = self.do_quality_checks(quality_check_class, rules, context) | ||
|
||
self.conn.upsert(objs) | ||
_ = [destination.persist(objs) for destination in destinations] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason to have list comprehension? can we just write normal for
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
@staticmethod | ||
def get_context(check_table: Table, context: Optional[Dict] = None) -> Dict: | ||
""" | ||
Construct context to pass to executors. User context overrides defaults. | ||
""" | ||
if context is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx 👍
@@ -27,26 +28,38 @@ def run( | |||
check_table: Dict, | |||
result_table: Dict, # todo - docs for quality name, maybe defaults.. | |||
context: Optional[Dict] = None, | |||
destinations: List[Destination] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im not sure about this. If we allow multiple destination, we should ensure consistency. Meaning, if DD send fail, we should rollback DB transaction and vice versa. Which requires more code to handle it.
As I think about this, I would for now allow only 1 destination. Just DD or DB. We(kiwi) will mostly use DB and if we would need, we can just run the same code with DD destination or to have a subscriber on DB level that will send it to DD (which is imho better idea in case you are using DB as destination)
What do you think?
Kiwi
|
seems like that this bq library you use doesn’t have some features like overriding project_id
also there is some hard coded postgres specific SQL code
also # of days is hardcoded for time filter - 30 days