Skip to content

Commit 66b57e5

Browse files
committed
Python: Non-Cython fallback Avro parser (#8521)
1 parent 9b193fa commit 66b57e5

File tree

5 files changed

+63
-82
lines changed

5 files changed

+63
-82
lines changed

build-module.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
import shutil
2020
from pathlib import Path
2121

22-
# Uncommend if your library can still function if extensions fail to compile.
23-
allowed_to_fail = False
24-
# allowed_to_fail = os.environ.get("CIBUILDWHEEL", "0") != "1"
22+
allowed_to_fail = os.environ.get("CIBUILDWHEEL", "0") != "1"
2523

2624

2725
def build_cython_extensions() -> None:

pyiceberg/avro/decoder.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
import io
1718
from abc import ABC, abstractmethod
1819
from io import SEEK_CUR
1920
from typing import (
2021
Dict,
2122
List,
2223
Tuple,
24+
Union,
2325
cast,
2426
)
2527

@@ -30,10 +32,6 @@
3032
class BinaryDecoder(ABC):
3133
"""Decodes bytes into Python physical primitives."""
3234

33-
@abstractmethod
34-
def __init__(self, input_stream: InputStream) -> None:
35-
"""Create the decoder."""
36-
3735
@abstractmethod
3836
def tell(self) -> int:
3937
"""Return the current position."""
@@ -138,10 +136,13 @@ class StreamingBinaryDecoder(BinaryDecoder):
138136
__slots__ = "_input_stream"
139137
_input_stream: InputStream
140138

141-
def __init__(self, input_stream: InputStream) -> None:
139+
def __init__(self, input_stream: Union[bytes, InputStream]) -> None:
142140
"""Reader is a Python object on which we can call read, seek, and tell."""
143-
super().__init__(input_stream)
144-
self._input_stream = input_stream
141+
if isinstance(input_stream, bytes):
142+
# In the case of bytes, we wrap it into a BytesIO to make it a stream
143+
self._input_stream = io.BytesIO(input_stream)
144+
else:
145+
self._input_stream = input_stream
145146

146147
def tell(self) -> int:
147148
"""Return the current stream position."""

pyiceberg/avro/file.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,14 @@
3535
)
3636

3737
from pyiceberg.avro.codecs import KNOWN_CODECS, Codec
38-
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder
38+
39+
try:
40+
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder as AvroDecoder
41+
except ModuleNotFoundError:
42+
import warnings
43+
44+
warnings.warn("Falling back to pure Python Avro decoder, missing Cython extension")
45+
from pyiceberg.avro.decoder import StreamingBinaryDecoder as AvroDecoder # type: ignore
3946
from pyiceberg.avro.encoder import BinaryEncoder
4047
from pyiceberg.avro.reader import ReadableDecoder, Reader
4148
from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve
@@ -166,7 +173,7 @@ def __enter__(self) -> AvroFile[D]:
166173
A generator returning the AvroStructs.
167174
"""
168175
with self.input_file.open() as f:
169-
self.decoder = CythonBinaryDecoder(f.read())
176+
self.decoder = AvroDecoder(f.read())
170177
self.header = self._read_header()
171178
self.schema = self.header.get_schema()
172179
if not self.read_schema:
@@ -198,7 +205,7 @@ def _read_block(self) -> int:
198205
if codec := self.header.compression_codec():
199206
block_bytes = codec.decompress(block_bytes)
200207

201-
self.block = Block(reader=self.reader, block_records=block_records, block_decoder=CythonBinaryDecoder(block_bytes))
208+
self.block = Block(reader=self.reader, block_records=block_records, block_decoder=AvroDecoder(block_bytes))
202209
return block_records
203210

204211
def __next__(self) -> D:

tests/avro/test_decoder.py

+35-54
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
import io
2019
import itertools
2120
import struct
2221
from io import SEEK_SET
@@ -32,45 +31,38 @@
3231
from pyiceberg.io import InputStream
3332
from pyiceberg.types import DoubleType, FloatType
3433

35-
AVAILABLE_DECODERS = [StreamingBinaryDecoder, lambda stream: CythonBinaryDecoder(stream.read())]
36-
37-
CALLABLE_DECODER = Callable[[InputStream], ReadableDecoder]
34+
AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder]
3835

3936

4037
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
41-
def test_read_boolean_true(decoder_class: CALLABLE_DECODER) -> None:
42-
mis = io.BytesIO(b"\x01")
43-
decoder = decoder_class(mis)
38+
def test_read_boolean_true(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
39+
decoder = decoder_class(b"\x01")
4440
assert decoder.read_boolean() is True
4541

4642

4743
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
48-
def test_read_boolean_false(decoder_class: CALLABLE_DECODER) -> None:
49-
mis = io.BytesIO(b"\x00")
50-
decoder = decoder_class(mis)
44+
def test_read_boolean_false(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
45+
decoder = decoder_class(b"\x00")
5146
assert decoder.read_boolean() is False
5247

5348

5449
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
55-
def test_skip_boolean(decoder_class: CALLABLE_DECODER) -> None:
56-
mis = io.BytesIO(b"\x00")
57-
decoder = decoder_class(mis)
50+
def test_skip_boolean(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
51+
decoder = decoder_class(b"\x00")
5852
assert decoder.tell() == 0
5953
decoder.skip_boolean()
6054
assert decoder.tell() == 1
6155

6256

6357
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
64-
def test_read_int(decoder_class: CALLABLE_DECODER) -> None:
65-
mis = io.BytesIO(b"\x18")
66-
decoder = decoder_class(mis)
58+
def test_read_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
59+
decoder = decoder_class(b"\x18")
6760
assert decoder.read_int() == 12
6861

6962

7063
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
71-
def test_read_int_longer(decoder_class: CALLABLE_DECODER) -> None:
72-
mis = io.BytesIO(b"\x8e\xd1\x87\x01")
73-
decoder = decoder_class(mis)
64+
def test_read_int_longer(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
65+
decoder = decoder_class(b"\x8e\xd1\x87\x01")
7466
assert decoder.read_int() == 1111111
7567

7668

@@ -88,27 +80,24 @@ def zigzag_encode(datum: int) -> bytes:
8880
"decoder_class, expected_value",
8981
list(itertools.product(AVAILABLE_DECODERS, [0, -1, 2**32, -(2**32), (2**63 - 1), -(2**63)])),
9082
)
91-
def test_read_int_custom_encode(decoder_class: CALLABLE_DECODER, expected_value: int) -> None:
83+
def test_read_int_custom_encode(decoder_class: Callable[[bytes], ReadableDecoder], expected_value: int) -> None:
9284
encoded = zigzag_encode(expected_value)
93-
mis = io.BytesIO(encoded)
94-
decoder = decoder_class(mis)
85+
decoder = decoder_class(encoded)
9586
decoded = decoder.read_int()
9687
assert decoded == expected_value, f"Decoded value does not match decoded={decoded} expected={expected_value}"
9788

9889

9990
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
100-
def test_skip_int(decoder_class: CALLABLE_DECODER) -> None:
101-
mis = io.BytesIO(b"\x18")
102-
decoder = decoder_class(mis)
91+
def test_skip_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
92+
decoder = decoder_class(b"\x18")
10393
assert decoder.tell() == 0
10494
decoder.skip_int()
10595
assert decoder.tell() == 1
10696

10797

10898
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
109-
def test_read_negative_bytes(decoder_class: CALLABLE_DECODER) -> None:
110-
mis = io.BytesIO(b"")
111-
decoder = decoder_class(mis)
99+
def test_read_negative_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
100+
decoder = decoder_class(b"")
112101

113102
with pytest.raises(ValueError) as exc_info:
114103
decoder.read(-1)
@@ -148,70 +137,62 @@ def __exit__(
148137

149138
# InMemoryBinaryDecoder doesn't work for a byte at a time reading
150139
@pytest.mark.parametrize("decoder_class", [StreamingBinaryDecoder])
151-
def test_read_single_byte_at_the_time(decoder_class: CALLABLE_DECODER) -> None:
152-
decoder = decoder_class(OneByteAtATimeInputStream())
140+
def test_read_single_byte_at_the_time(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
141+
decoder = decoder_class(OneByteAtATimeInputStream()) # type: ignore
153142
assert decoder.read(2) == b"\x01\x02"
154143

155144

156145
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
157-
def test_read_float(decoder_class: CALLABLE_DECODER) -> None:
158-
mis = io.BytesIO(b"\x00\x00\x9A\x41")
159-
decoder = decoder_class(mis)
146+
def test_read_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
147+
decoder = decoder_class(b"\x00\x00\x9A\x41")
160148
assert decoder.read_float() == 19.25
161149

162150

163151
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
164-
def test_skip_float(decoder_class: CALLABLE_DECODER) -> None:
165-
mis = io.BytesIO(b"\x00\x00\x9A\x41")
166-
decoder = decoder_class(mis)
152+
def test_skip_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
153+
decoder = decoder_class(b"\x00\x00\x9A\x41")
167154
assert decoder.tell() == 0
168155
decoder.skip_float()
169156
assert decoder.tell() == 4
170157

171158

172159
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
173-
def test_read_double(decoder_class: CALLABLE_DECODER) -> None:
174-
mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40")
175-
decoder = decoder_class(mis)
160+
def test_read_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
161+
decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40")
176162
assert decoder.read_double() == 19.25
177163

178164

179165
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
180-
def test_skip_double(decoder_class: CALLABLE_DECODER) -> None:
181-
mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40")
182-
decoder = decoder_class(mis)
166+
def test_skip_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
167+
decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40")
183168
assert decoder.tell() == 0
184169
decoder.skip_double()
185170
assert decoder.tell() == 8
186171

187172

188173
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
189-
def test_read_bytes(decoder_class: CALLABLE_DECODER) -> None:
190-
mis = io.BytesIO(b"\x08\x01\x02\x03\x04")
191-
decoder = decoder_class(mis)
174+
def test_read_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
175+
decoder = decoder_class(b"\x08\x01\x02\x03\x04")
192176
actual = decoder.read_bytes()
193177
assert actual == b"\x01\x02\x03\x04"
194178

195179

196180
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
197-
def test_read_utf8(decoder_class: CALLABLE_DECODER) -> None:
198-
mis = io.BytesIO(b"\x04\x76\x6F")
199-
decoder = decoder_class(mis)
181+
def test_read_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
182+
decoder = decoder_class(b"\x04\x76\x6F")
200183
assert decoder.read_utf8() == "vo"
201184

202185

203186
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
204-
def test_skip_utf8(decoder_class: CALLABLE_DECODER) -> None:
205-
mis = io.BytesIO(b"\x04\x76\x6F")
206-
decoder = decoder_class(mis)
187+
def test_skip_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
188+
decoder = decoder_class(b"\x04\x76\x6F")
207189
assert decoder.tell() == 0
208190
decoder.skip_utf8()
209191
assert decoder.tell() == 3
210192

211193

212194
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
213-
def test_read_int_as_float(decoder_class: CALLABLE_DECODER) -> None:
214-
mis = io.BytesIO(b"\x00\x00\x9A\x41")
215-
decoder = decoder_class(mis)
195+
def test_read_int_as_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
196+
decoder = decoder_class(b"\x00\x00\x9A\x41")
216197
reader = resolve(FloatType(), DoubleType())
217198
assert reader.read(decoder) == 19.25

tests/avro/test_reader.py

+9-15
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint:disable=protected-access
18-
import io
1918
import json
2019
from typing import Callable
2120

@@ -42,7 +41,6 @@
4241
UUIDReader,
4342
)
4443
from pyiceberg.avro.resolver import construct_reader
45-
from pyiceberg.io import InputStream
4644
from pyiceberg.io.pyarrow import PyArrowFileIO
4745
from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMA, DataFile, ManifestEntry
4846
from pyiceberg.schema import Schema
@@ -67,7 +65,7 @@
6765
UUIDType,
6866
)
6967

70-
AVAILABLE_DECODERS = [StreamingBinaryDecoder, lambda stream: CythonBinaryDecoder(stream.read())]
68+
AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder]
7169

7270

7371
def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema) -> None:
@@ -342,18 +340,16 @@ def test_uuid_reader() -> None:
342340

343341

344342
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
345-
def test_read_struct(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
346-
mis = io.BytesIO(b"\x18")
347-
decoder = decoder_class(mis)
343+
def test_read_struct(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
344+
decoder = decoder_class(b"\x18")
348345
struct = StructType(NestedField(1, "id", IntegerType(), required=True))
349346
result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder)
350347
assert repr(result) == "Record[id=12]"
351348

352349

353350
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
354-
def test_read_struct_lambda(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
355-
mis = io.BytesIO(b"\x18")
356-
decoder = decoder_class(mis)
351+
def test_read_struct_lambda(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
352+
decoder = decoder_class(b"\x18")
357353

358354
struct = StructType(NestedField(1, "id", IntegerType(), required=True))
359355
# You can also pass in an arbitrary function that returns a struct
@@ -364,9 +360,8 @@ def test_read_struct_lambda(decoder_class: Callable[[InputStream], ReadableDecod
364360

365361

366362
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
367-
def test_read_not_struct_type(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
368-
mis = io.BytesIO(b"\x18")
369-
decoder = decoder_class(mis)
363+
def test_read_not_struct_type(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
364+
decoder = decoder_class(b"\x18")
370365

371366
struct = StructType(NestedField(1, "id", IntegerType(), required=True))
372367
with pytest.raises(ValueError) as exc_info:
@@ -376,9 +371,8 @@ def test_read_not_struct_type(decoder_class: Callable[[InputStream], ReadableDec
376371

377372

378373
@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
379-
def test_read_struct_exception_handling(decoder_class: Callable[[InputStream], ReadableDecoder]) -> None:
380-
mis = io.BytesIO(b"\x18")
381-
decoder = decoder_class(mis)
374+
def test_read_struct_exception_handling(decoder_class: Callable[[bytes], ReadableDecoder]) -> None:
375+
decoder = decoder_class(b"\x18")
382376

383377
def raise_err(struct: StructType) -> None:
384378
raise TypeError("boom")

0 commit comments

Comments
 (0)