-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat(Source-S3): Use dataframe processing in place of singleton record operations (polars) #44194
base: master
Are you sure you want to change the base?
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
dd354aa
to
0901763
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clnoll, @pnilan - If you have a sec, could you review this file's changes?
This works in my testing - the revised/refactored version attempts to handle more edge cases predictably. As discussed, previous to this PR, we were hitting the condition where concurrency was defined (allowing concurrency in full-refresh mode) but the cursor was not concurrent (disabling concurrency in incremental mode). We probably could add a test to check for this, but for now I warn explicitly.
I also used the continue
pattern to make the code slightly easier to read with less branching. When handled by an earlier case, we can disregard the remainder of the code in the loop.
Let me know what you think! Thanks!
from airbyte_cdk.sources.file_based.types import StreamState | ||
|
||
logger = logging.Logger("source-S3") | ||
|
||
|
||
class Cursor(DefaultFileBasedCursor): | ||
class Cursor(FileBasedConcurrentCursor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clnoll - I took a pass at refactoring with the concurrent base class. It looks like it is working smoothly now, but let me know if anything looks off! Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks right to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaronsteers the switch to the concurrent cursor looks fine to me.
I did a superficial pass at some of the other code and left some minor comments, but I know this is a draft so they're probably already on your mind.
@@ -71,6 +101,11 @@ class FileBasedStreamConfig(BaseModel): | |||
default=None, | |||
gt=0, | |||
) | |||
bulk_mode: BulkMode = Field( | |||
title="Bulk Processing Optimizations", | |||
description="The bulk processing mode for this stream.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this will be surfaced to users, it would be nice to give them more information about how to choose. If we dynamically select whether we use bulk mode if a user selects AUTO
, we should also consider telling them the criteria we're using.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - if this is only available for jsonl to start it should probably be in the JsonlFormat
file.
@@ -0,0 +1,46 @@ | |||
"""Simple script to download secrets from GCS. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity did you try using ci_credentials?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clnoll - Yeah, I tried that first before creating the GSMSecretManager class in PyAirbyte. The ci_credentials
library didn't install cleanly or run cleanly when I tried - and it printed secrets to console output by default, part of the github "hide secrets" feature, but when run outside of github actions, it had the reverse effect as intended.
After a few attempts at using ci_credentials
, I decided it would be easier to just bring the code into PyAirbyte - some of the code is vendored in originally from ci_credentials
, but by now it is pretty specialized for the PyAirbyte use cases.
The docs here show how PyAirbyte handles building the secret manager and getting secrets: https://airbytehq.github.io/PyAirbyte/airbyte/secrets.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, that's good to know. I think @alafanechere would probably be interested in the issues with ci_credentials
.
from airbyte_cdk.sources.file_based.types import StreamState | ||
|
||
logger = logging.Logger("source-S3") | ||
|
||
|
||
class Cursor(DefaultFileBasedCursor): | ||
class Cursor(FileBasedConcurrentCursor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks right to me!
with stream_reader.open_file( | ||
file=file, | ||
mode=self.file_read_mode, | ||
encoding=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might a user have a non-default encoding?
df: pl.DataFrame = pl.read_ndjson( | ||
source=batch, | ||
# schema=schema, # TODO: Add detected schema | ||
infer_schema_length=10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be using this config value?
What
This PR replaces the inner record loop with a dataframe-based transformation of records in bulk.
We use the Polars library, which:
One helpful way to think about this, I think, is to consider this a move from procedural to functional programming - specifically for the operations we perform once on every records and for which processing speed is most important. Rather than managing a step-by-step process to operate on each record, we define the operations and let the engine push down, consolidate, and parallelize as much as possible.
How
Perf Profile (Tentative)
Show/Hide
With the latest updates, running locally on my Mac:
About 4x improvement versus the docker image, running locally on my Mac:
Still TODO
Related Docs