Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37192] [pyflink] Replace deprecated avro-python3 with avro #26008

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flink-python/dev/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ cython>=0.29.24
py4j==0.10.9.7
python-dateutil>=2.8.0,<3
cloudpickle~=2.2.0
avro-python3>=1.8.1,!=1.9.2
avro>=1.12.0
pandas>=1.3.0
pyarrow>=5.0.0
pytz>=2018.3
Expand Down
9 changes: 4 additions & 5 deletions flink-python/pyflink/fn_execution/formats/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
################################################################################
import struct

from avro.errors import AvroTypeException, SchemaResolutionException
from avro.io import (
AvroTypeException,
BinaryDecoder,
BinaryEncoder,
DatumReader,
DatumWriter,
SchemaResolutionException,
Validate,
validate,
)

STRUCT_FLOAT = struct.Struct('>f') # big-endian float
Expand Down Expand Up @@ -203,7 +202,7 @@ def write_bytes(self, datum):
class FlinkAvroDatumWriter(DatumWriter):

def __init__(self, writer_schema=None):
super().__init__(writer_schema=writer_schema)
super().__init__(writers_schema=writer_schema)

def write_array(self, writer_schema, datum, encoder):
if len(datum) > 0:
Expand All @@ -224,7 +223,7 @@ def write_union(self, writer_schema, datum, encoder):
# resolve union
index_of_schema = -1
for i, candidate_schema in enumerate(writer_schema.schemas):
if Validate(candidate_schema, datum):
if validate(candidate_schema, datum):
index_of_schema = i
if index_of_schema < 0:
raise AvroTypeException(writer_schema, datum)
Expand Down
2 changes: 1 addition & 1 deletion flink-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def extracted_output_files(base_dir, file_path, output_directory):

install_requires = ['py4j==0.10.9.7', 'python-dateutil>=2.8.0,<3',
'apache-beam>=2.54.0,<=2.61.0',
'cloudpickle>=2.2.0', 'avro-python3>=1.8.1,!=1.9.2',
'cloudpickle>=2.2.0', 'avro>=1.12.0',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be slight differences between the old avro-python3 and the new preferred avro packages. I assume the other changes in this pr relate to the slight changes. The original code seems to be tolerating a range of versions, do we know why? Or is the latest fine?

I am curious whether fastavro needs to be updated to be compatible with avro 1.12.0 in any way. fastavro 1.1 is 5 years old - should we push this to the latest as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be slight differences between the old avro-python3 and the new preferred avro packages. I assume the other changes in this pr relate to the slight changes. The original code seems to be tolerating a range of versions, do we know why? Or is the latest fine?

I think we are fine, basically avro-python3 was deprecated in favour of avro, it's the same package just renamed, but they also did some breaking changes in these updates (changing package names, slightly changing some names, etc...), that's what the code changes here account for. We still account for a range of versions here though, which afaik is a Python best practice so that you don't lock up consumers to a certain library.

I am curious whether fastavro needs to be updated to be compatible with avro 1.12.0 in any way. fastavro 1.1 is 5 years old - should we push this to the latest as well.

fastavro is completely unrelated (beats me with Flink is using two separate Avro libraries tbh), avro-python3/avro is a pure Python avro implementation, fastavro is written in C to offer faster execution. Might be worth using a single Python package for avro or upgrading avro, but I think that's a bigger effort and should be out of scope of this smaller change here.

'pytz>=2018.3', 'fastavro>=1.1.0,!=1.8.0', 'requests>=2.26.0',
'protobuf>=3.19.0',
'numpy>=1.22.4',
Expand Down