forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathavro.py
233 lines (187 loc) · 7.83 KB
/
avro.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import struct
from avro.errors import AvroTypeException, SchemaResolutionException
from avro.io import (
BinaryDecoder,
BinaryEncoder,
DatumReader,
DatumWriter,
validate,
)
STRUCT_FLOAT = struct.Struct('>f') # big-endian float
STRUCT_DOUBLE = struct.Struct('>d') # big-endian double
STRUCT_INT = struct.Struct('>i') # big-endian int
STRUCT_LONG_LONG = struct.Struct('>q') # big-endian long long
class FlinkAvroBufferWrapper(object):
def __init__(self):
self._stream = None
def switch_stream(self, stream):
self._stream = stream
def read(self, n=1):
return self._stream.read(n)
def write(self, data):
return self._stream.write(data)
class FlinkAvroDecoder(BinaryDecoder):
"""
Flink-customized basic type decoder, with some different encodings, e.g. Default avro encode int
and long into variable-sized bytes, while Flink use fixed bytes in some places.
"""
def __init__(self, reader):
super().__init__(reader)
def read_int(self):
return STRUCT_INT.unpack(self.read(4))[0]
def read_long(self):
return STRUCT_LONG_LONG.unpack(self.read(8))[0]
def read_var_long(self):
"""
Flink implementation of variable-sized long serialization does not move sign to lower bit
and flip the rest.
"""
b = ord(self.read(1))
n = b & 0x7F
shift = 7
while (b & 0x80) != 0:
b = ord(self.read(1))
n |= (b & 0x7F) << shift
shift += 7
return n
def read_float(self):
return STRUCT_FLOAT.unpack(self.read(4))[0]
def read_double(self):
return STRUCT_DOUBLE.unpack(self.read(8))[0]
def read_bytes(self):
nbytes = self.read_int()
assert (nbytes >= 0), nbytes
return self.read(nbytes)
def skip_int(self):
self.skip(4)
def skip_long(self):
self.skip(8)
def skip_bytes(self):
nbytes = self.read_int()
assert (nbytes >= 0), nbytes
self.skip(nbytes)
class FlinkAvroDatumReader(DatumReader):
"""
Flink-customized datum reader to parse composite data structure, to fit with changed basic type
serialization in FlinkAvroDecoder.
"""
def __init__(self, writer_schema=None, reader_schema=None):
super().__init__(writer_schema, reader_schema)
def read_array(self, writer_schema, reader_schema, decoder: 'FlinkAvroDecoder'):
read_items = []
block_count = decoder.read_var_long()
assert block_count >= 0
if block_count == 0:
return read_items
for i in range(block_count):
read_items.append(self.read_data(writer_schema.items,
reader_schema.items, decoder))
decoder.read_var_long()
return read_items
def skip_array(self, writer_schema, decoder: 'FlinkAvroDecoder'):
block_count = decoder.read_var_long()
assert block_count >= 0
if block_count == 0:
return
for i in range(block_count):
self.skip_data(writer_schema.items, decoder)
decoder.read_var_long()
def read_map(self, writer_schema, reader_schema, decoder: 'FlinkAvroDecoder'):
read_items = {}
block_count = decoder.read_var_long()
assert block_count >= 0
if block_count == 0:
return read_items
for i in range(block_count):
key = decoder.read_utf8()
read_items[key] = self.read_data(writer_schema.values,
reader_schema.values, decoder)
decoder.read_var_long()
return read_items
def skip_map(self, writer_schema, decoder: 'FlinkAvroDecoder'):
block_count = decoder.read_var_long()
assert block_count >= 0
if block_count == 0:
return
for i in range(block_count):
decoder.skip_utf8()
self.skip_data(writer_schema.values, decoder)
decoder.read_long()
def read_union(self, writer_schema, reader_schema, decoder: 'FlinkAvroDecoder'):
index_of_schema = int(decoder.read_int())
if index_of_schema >= len(writer_schema.schemas):
fail_msg = "Can't access branch index %d for union with %d branches" \
% (index_of_schema, len(writer_schema.schemas))
raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
selected_writer_schema = writer_schema.schemas[index_of_schema]
return self.read_data(selected_writer_schema, reader_schema, decoder)
def skip_union(self, writer_schema, decoder):
index_of_schema = int(decoder.read_int())
if index_of_schema >= len(writer_schema.schemas):
fail_msg = "Can't access branch index %d for union with %d branches" \
% (index_of_schema, len(writer_schema.schemas))
raise SchemaResolutionException(fail_msg, writer_schema)
return self.skip_data(writer_schema.schemas[index_of_schema], decoder)
class FlinkAvroEncoder(BinaryEncoder):
def __init__(self, writer):
super().__init__(writer)
def write_int(self, datum):
self.write(STRUCT_INT.pack(datum))
def write_long(self, datum):
self.write(STRUCT_LONG_LONG.pack(datum))
def write_var_long(self, datum):
while datum & 0x80 != 0:
self.write((datum & 0x80).to_bytes(1, 'big'))
datum <<= 7
self.write(datum.to_bytes(1, 'big'))
def write_float(self, datum):
self.write(STRUCT_FLOAT.pack(datum))
def write_double(self, datum):
self.write(STRUCT_DOUBLE.pack(datum))
def write_bytes(self, datum):
self.write_int(len(datum))
self.write(datum)
class FlinkAvroDatumWriter(DatumWriter):
def __init__(self, writer_schema=None):
super().__init__(writers_schema=writer_schema)
def write_array(self, writer_schema, datum, encoder):
if len(datum) > 0:
encoder.write_var_long(len(datum))
for item in datum:
self.write_data(writer_schema.items, item, encoder)
encoder.write_long(0)
def write_map(self, writer_schema, datum, encoder):
if len(datum) > 0:
encoder.write_var_long(len(datum))
for key, val in datum.items():
encoder.write_utf8(key)
self.write_data(writer_schema.values, val, encoder)
encoder.write_var_long(0)
def write_union(self, writer_schema, datum, encoder):
# resolve union
index_of_schema = -1
for i, candidate_schema in enumerate(writer_schema.schemas):
if validate(candidate_schema, datum):
index_of_schema = i
if index_of_schema < 0:
raise AvroTypeException(writer_schema, datum)
# write data
encoder.write_int(index_of_schema)
self.write_data(writer_schema.schemas[index_of_schema], datum, encoder)