Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(Source-S3): Use dataframe processing in place of singleton record operations (polars) #44194

Draft
wants to merge 70 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
726a722
initial code scaffold for dataframe processing
aaronsteers Aug 16, 2024
faa8517
drive-by-fix: typo in type hint
aaronsteers Aug 20, 2024
9fc9cf7
Merge remote-tracking branch 'origin/master' into aj/source-s3/datafr…
aaronsteers Aug 20, 2024
50207aa
drive-by-fix: missing .gitignore for test artifact
aaronsteers Aug 20, 2024
392c43b
mark method as abstract
aaronsteers Aug 20, 2024
f8a7b7c
`poetry add polars` (TODO: Move to extra)
aaronsteers Aug 20, 2024
0a00d1e
implement `parse_records_to_dataframes()` for jsonl file type
aaronsteers Aug 20, 2024
cec132a
add config option `FileBasedStreamConfig.bulk_mode`
aaronsteers Aug 20, 2024
09e8bf7
apply new enum class
aaronsteers Aug 20, 2024
c78a576
checkpoint: basic plumbing in place
aaronsteers Aug 22, 2024
1dab0c4
resolve version conflicts in `source-s3`
aaronsteers Aug 22, 2024
63a8dc2
minor fixes
aaronsteers Aug 22, 2024
2d3031d
ability to step-debug "full refresh" acceptance tests
aaronsteers Aug 22, 2024
a7b1989
make polars part of the file-based extra
aaronsteers Aug 22, 2024
e8b4a2d
fix lock check in airbyte-ci
aaronsteers Aug 22, 2024
92733e9
script to download secret config
aaronsteers Aug 22, 2024
cbb8777
fix extra args
aaronsteers Aug 22, 2024
07f7929
cleanup secret fetch script
aaronsteers Aug 22, 2024
d0da02a
checkpoint: jsonl sync running successfully
aaronsteers Aug 23, 2024
87ea175
tidy secrets install script using latest pyairbyte features
aaronsteers Sep 2, 2024
82568e0
tidy some more
aaronsteers Sep 2, 2024
f8093ce
make helper script slightly more reusable
aaronsteers Sep 6, 2024
a487c13
use local CDK in poetry
aaronsteers Sep 6, 2024
3a6305d
add read_to_buffer stub
aaronsteers Sep 6, 2024
77120ab
improve handling
aaronsteers Sep 10, 2024
1ca2ead
add perftest
aaronsteers Sep 10, 2024
a92fa34
chore: perf tests
aaronsteers Sep 10, 2024
feac74a
add code to stream partition class
aaronsteers Sep 10, 2024
ed0032b
lint fixes
aaronsteers Sep 10, 2024
d1abb84
default to lazy
aaronsteers Sep 10, 2024
1e2e657
add `out` override arg
aaronsteers Sep 10, 2024
b8ff8cd
Merge remote-tracking branch 'origin/master' into aj/source-s3/datafr…
aaronsteers Sep 10, 2024
b8c0b11
git: hide python venvs
aaronsteers Sep 12, 2024
2b0e986
update perf test script
aaronsteers Sep 12, 2024
9eece3f
move perf test script to new poetry project
aaronsteers Sep 17, 2024
42ed3ea
Merge remote-tracking branch 'origin/master' into aj/source-s3/datafr…
aaronsteers Sep 17, 2024
6744216
re-lock poetry in cdk and connector
aaronsteers Sep 17, 2024
b982e50
working sync and perf tests
aaronsteers Sep 17, 2024
1589768
chore: misc pr clean up
aaronsteers Sep 18, 2024
55e1f46
update perf test script
aaronsteers Sep 18, 2024
e00857f
fix: add missing file-based columns
aaronsteers Sep 18, 2024
1c5b7ea
chore: clean up pr
aaronsteers Sep 18, 2024
78a2a99
chore: clean up defaults
aaronsteers Sep 18, 2024
faa9068
clean up perf test script
aaronsteers Sep 18, 2024
952d90d
improve default bulk mode handling
aaronsteers Sep 18, 2024
209caf7
add bulk mode logging
aaronsteers Sep 18, 2024
7112041
improve bulk mode resolve
aaronsteers Sep 18, 2024
644861f
tidy
aaronsteers Sep 18, 2024
0901763
tidy
aaronsteers Sep 18, 2024
486fdab
tidy jsonl parser comments
aaronsteers Sep 18, 2024
cd5a7dd
rename variable
aaronsteers Sep 18, 2024
d7c7af7
fix type hint
aaronsteers Sep 18, 2024
0d80e81
update perf-test script
aaronsteers Sep 18, 2024
a94187d
chore: update comment
aaronsteers Sep 18, 2024
015fd90
delete unused
aaronsteers Sep 18, 2024
4cec7a9
multiple fixes, refactoring, including change to concurrent cursor fo…
aaronsteers Sep 20, 2024
60f843c
chore: add comment
aaronsteers Sep 20, 2024
6a20761
chore: add CLI entrypoint
aaronsteers Sep 20, 2024
5ebf102
update tests
aaronsteers Sep 20, 2024
87e43c7
update poetry projects and lock files
aaronsteers Sep 20, 2024
c930f69
remove very slow tests from acceptance tests
aaronsteers Sep 20, 2024
b2cd2c5
minor format stuff
aaronsteers Sep 20, 2024
51d511c
clean up files
aaronsteers Sep 20, 2024
9526835
update comment
aaronsteers Sep 20, 2024
da92440
update poetry
aaronsteers Sep 21, 2024
b946e43
update perf test
aaronsteers Sep 21, 2024
1919b8d
update cursor logic
aaronsteers Sep 21, 2024
2bcbc98
update concurrency
aaronsteers Sep 21, 2024
6aba77e
update poetry
aaronsteers Sep 21, 2024
5478be1
buffered reads
aaronsteers Sep 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
venv
.venv
.venv-*
.gradle
.idea
*.iml
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/python/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
test_response.txt
.coverage

# TODO: these are tmp files generated by unit tests. They should go to the /tmp directory.
Expand Down
30 changes: 26 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
import importlib
import ipaddress
import logging
import os
import os.path
import socket
import sys
import tempfile
from collections import defaultdict
from functools import wraps
from typing import Any, DefaultDict, Iterable, List, Mapping, Optional
from typing import Any, DefaultDict, Iterable, List, Mapping, Optional, TextIO
from urllib.parse import urlparse

import requests
from requests import PreparedRequest, Response, Session

from airbyte_cdk.connector import TConfig
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.logger import init_logger
Expand Down Expand Up @@ -235,14 +238,33 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
return


def launch(source: Source, args: List[str]) -> None:
def launch(
source: Source,
args: List[str],
output_stream: TextIO = None,
) -> None:
"""Launch the source connector with the given arguments.

Optionally, you can provide an output stream to redirect the output of the source connector.
The default is `sys.stdout` but you can also send to `os.devnull` to suppress output,
or any other file-like object.
"""
output_stream = output_stream or sys.stdout
source_entrypoint = AirbyteEntrypoint(source)
parsed_args = source_entrypoint.parse_args(args)
record_iterator = source_entrypoint.run(parsed_args)

if output_stream is os.devnull:
# Skip printing:
for _ in record_iterator:
pass
return

with PrintBuffer():
for message in source_entrypoint.run(parsed_args):
for message in record_iterator:
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="", flush=True)
print(f"{message}\n", end="", flush=True, file=output_stream)


def _init_internal_request_filter() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ def _handle_item(
concurrent_stream_processor: ConcurrentReadProcessor,
) -> Iterable[AirbyteMessage]:
# handle queue item and call the appropriate handler depending on the type of the queue item
if isinstance(queue_item, StreamThreadException):
if isinstance(queue_item, AirbyteMessage):
# Most likely a record message, pre-wrapped for perf reasons. Just yield it.
yield queue_item
elif isinstance(queue_item, StreamThreadException):
yield from concurrent_stream_processor.on_exception(queue_item)
elif isinstance(queue_item, PartitionGenerationCompletedSentinel):
yield from concurrent_stream_processor.on_partition_generation_completed(queue_item)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from __future__ import annotations

from enum import Enum
from typing import Any, List, Mapping, Optional, Union

from pydantic.v1 import BaseModel, Field, validator

from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
Expand All @@ -13,7 +16,6 @@
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema
from pydantic.v1 import BaseModel, Field, validator

PrimaryKeyType = Optional[Union[str, List[str]]]

Expand All @@ -24,6 +26,34 @@ class ValidationPolicy(Enum):
wait_for_discover = "Wait for Discover"


# TODO: Consider defaulting to DISABLED if unstable
DEFAULT_BULK_MODE = "DISABLED"


class ResolvedBulkMode(Enum):
DISABLED = "DISABLED"
ENABLED = "ENABLED"


class BulkMode(Enum):
"""Enabled bulk processing for file-based streams.

The in-memory mode is the fastest but requires enough memory to store all the records in memory.
The lazy mode is the slowest but requires the least amount of memory.
When bulk-mode is disabled, records are processed individually.
"""

DISABLED = "DISABLED"
ENABLED = "ENABLED"
AUTO = "AUTO"

def resolve(bulk_mode: BulkMode) -> ResolvedBulkMode:
if bulk_mode == BulkMode.AUTO:
return ResolvedBulkMode(DEFAULT_BULK_MODE)

return ResolvedBulkMode.DISABLED if bulk_mode == BulkMode.DISABLED else ResolvedBulkMode.ENABLED


class FileBasedStreamConfig(BaseModel):
name: str = Field(title="Name", description="The name of the stream.")
globs: Optional[List[str]] = Field(
Expand Down Expand Up @@ -71,6 +101,11 @@ class FileBasedStreamConfig(BaseModel):
default=None,
gt=0,
)
bulk_mode: BulkMode = Field(
title="Bulk Processing Optimizations",
description="The bulk processing mode for this stream.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this will be surfaced to users, it would be nice to give them more information about how to choose. If we dynamically select whether we use bulk mode if a user selects AUTO, we should also consider telling them the criteria we're using.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - if this is only available for jsonl to start it should probably be in the JsonlFormat file.

default=BulkMode.AUTO,
)

@validator("input_schema", pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
Expand Down
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clnoll, @pnilan - If you have a sec, could you review this file's changes?

This works in my testing - the revised/refactored version attempts to handle more edge cases predictably. As discussed, previous to this PR, we were hitting the condition where concurrency was defined (allowing concurrency in full-refresh mode) but the cursor was not concurrent (disabling concurrency in incremental mode). We probably could add a test to check for this, but for now I warn explicitly.

I also used the continue pattern to make the code slightly easier to read with less branching. When handled by an earlier case, we can disregard the remainder of the code in the loop.

Let me know what you think! Thanks!

Original file line number Diff line number Diff line change
Expand Up @@ -187,43 +187,61 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
)
self._validate_input_schema(stream_config)

sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
sync_mode: SyncMode | None = self._get_sync_mode_from_catalog(stream_config.name)
# Note: sync_mode may be `None` in `check` and `discover` modes.

if sync_mode == SyncMode.full_refresh and hasattr(self, "_concurrency_level") and self._concurrency_level is not None:
cursor = FileBasedFinalStateCursor(
stream_config=stream_config, stream_namespace=None, message_repository=self.message_repository
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
# Incremental sync but non-concurrent cursor. This is not allowed.
if (
hasattr(self, "_concurrency_level") and self._concurrency_level is not None
and not issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
):
self.logger.warning(
"An internal error occurred. The cursor class must be a concurrent "
"cursor if concurrency level is set. "
"Falling back to non-concurrent execution, which may be slower."
)
self._concurrency_level = None

elif (
sync_mode == SyncMode.incremental
and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
and hasattr(self, "_concurrency_level")
and self._concurrency_level is not None
):
assert (
state_manager is not None
), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."

cursor = self.cursor_cls(
stream_config,
stream_config.name,
None,
stream_state,
self.message_repository,
state_manager,
CursorField(DefaultFileBasedStream.ab_last_mod_col),
if not hasattr(self, "_concurrency_level") or self._concurrency_level is None:
# Concurrency not supported for this stream.
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(stream_config, cursor)
streams.append(stream)
continue

# Else, we have a concurrency level set and a valid concurrent cursor class.

if sync_mode == SyncMode.full_refresh or sync_mode is None:
cursor = FileBasedFinalStateCursor(
stream_config=stream_config, stream_namespace=None, message_repository=self.message_repository
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
)
else:
cursor = self.cursor_cls(stream_config)
stream = self._make_default_stream(stream_config, cursor)

streams.append(stream)
continue

# Else, incremental sync with concurrent cursor:

assert (
state_manager is not None
), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."

cursor = self.cursor_cls(
stream_config,
stream_config.name,
None,
stream_state,
self.message_repository,
state_manager,
CursorField(DefaultFileBasedStream.ab_last_mod_col),
)
stream = FileBasedStreamFacade.create_from_stream(
self._make_default_stream(stream_config, cursor), self, self.logger, stream_state, cursor
)
streams.append(stream)
continue

return streams

except ValidationError as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,50 @@ def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
"""
prefixes = {glob.split("*")[0] for glob in globs}
return set(filter(lambda x: bool(x), prefixes))

def is_polars_supported(self, file: RemoteFile | None = None) -> bool:
"""
Return `True` if Polars is supported and `False` otherwise.
Optionally, the method can take a file to check if Polars is supported for that file.

The default implementation returns True for all files.
"""
try:
# If any part of this block raises an exception, we assume Polars is not supported
# and we return False.
if file and not self.get_fully_qualified_uri(file.uri):
return False

if not self.polars_storage_options:
return False
except NotImplementedError:
return False
else:
# No exceptions were raised, so we assume Polars is supported.
return True

def get_fully_qualified_uri(
self,
file_uri: str,
) -> str:
"""Returns the fully qualified URI for the given file URI.

For example, if the source uses S3, this method would prepend the bucket name to the URI.
"""
if "://" in file_uri:
return file_uri

raise NotImplementedError(
"The `get_fully_qualified_uri()` method is not implemented by class: " + type(self).__name__,
)

@property
def polars_storage_options(self) -> dict[str, str]:
"""Return storage options for the stream reader.

Raises:
NotImplementedError: If the method is not implemented by the concrete class.
"""
raise NotImplementedError(
"The `polars_storage_options()` method is not implemented by class: " + type(self).__name__,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

import polars as pl

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
Expand Down Expand Up @@ -81,3 +83,16 @@ def file_read_mode(self) -> FileReadMode:
The mode in which the file should be opened for reading.
"""
...

def parse_records_as_dataframes(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[pl.DataFrame | pl.LazyFrame]:
"""
Parse records and emit as iterable of Pandas DataFrames.
"""
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import logging
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union

import polars as pl

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
Expand Down Expand Up @@ -87,6 +89,7 @@ def _parse_jsonl_entries(
logger: logging.Logger,
read_limit: bool = False,
) -> Iterable[Dict[str, Any]]:
"""Parse records and emit as iterable of dictionaries."""
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
read_bytes = 0

Expand Down Expand Up @@ -128,3 +131,47 @@ def _instantiate_accumulator(line: Union[bytes, str]) -> Union[bytes, str]:
return bytes("", json.detect_encoding(line))
elif isinstance(line, str):
return ""

def parse_records_as_dataframes(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[pl.DataFrame | pl.LazyFrame]:
"""Parse records and emit as iterable of data frames.

Currently this only returns an iterator containing a single data frame. This may
be updated in the future to return an iterator with multiple DataFrames.
"""

# The incoming URI is actually a relative path. We need the absolute ref, for
# instance: including the 's3://' protocol, bucket name, etc.
fully_qualified_uri = stream_reader.get_fully_qualified_uri(file.uri.split("#")[0])
storage_options = stream_reader.polars_storage_options
logger.info("Using bulk processing mode to read JSONL file: %s", fully_qualified_uri)

lazyframe: pl.LazyFrame = pl.scan_ndjson(
fully_qualified_uri,
storage_options=storage_options,
row_index_name="_ab_record_index",
infer_schema_length=10_000,
).with_columns(
pl.lit(file.uri).alias("_ab_source_file_url"),
pl.lit(file.last_modified).alias("_ab_source_file_last_modified")
)

def slice_generator(
lazyframe: pl.LazyFrame,
batch_size: int = 50_000,
) -> Iterable[pl.DataFrame]:
offset = 0
while True:
slice = lazyframe.slice(offset=offset, length=batch_size).collect(streaming=True)
height = slice.height
if height == 0:
break
yield slice

yield from slice_generator(lazyframe)
Loading
Loading