Skip to content

Commit 9b0d865

Browse files
committed
fix: rb per object
1 parent da5e16c commit 9b0d865

File tree

3 files changed

+96
-115
lines changed

3 files changed

+96
-115
lines changed

src/event/format/json.rs

+58-71
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl Event {
134134
}
135135

136136
impl EventFormat for Event {
137-
type Data = Vec<Json>;
137+
type Data = Json;
138138

139139
// convert the incoming json to a vector of json values
140140
// also extract the arrow schema, tags and metadata from the incoming json
@@ -144,7 +144,7 @@ impl EventFormat for Event {
144144
time_partition_limit: Option<NonZeroU32>,
145145
custom_partitions: Option<&String>,
146146
schema_version: SchemaVersion,
147-
) -> anyhow::Result<Self::Data> {
147+
) -> anyhow::Result<Vec<Self::Data>> {
148148
self.flatten_logs(
149149
time_partition,
150150
time_partition_limit,
@@ -161,17 +161,18 @@ impl EventFormat for Event {
161161
schema_version: SchemaVersion,
162162
) -> anyhow::Result<(super::EventSchema, bool)> {
163163
// collect all the keys from all the json objects in the request body
164-
let fields = collect_keys(data.iter());
164+
let fields = collect_keys(data);
165165

166166
let mut is_first = false;
167167
let schema = if let Some(schema) = derive_arrow_schema(stored_schema, fields) {
168168
schema
169169
} else {
170170
// TODO:
171-
let mut infer_schema = infer_json_schema_from_iterator(
172-
data.iter().map(|obj| Ok(Value::Object(obj.clone()))),
173-
)
174-
.map_err(|err| anyhow!("Could not infer schema for this event due to err {:?}", err))?;
171+
let mut infer_schema =
172+
infer_json_schema_from_iterator([Ok(Value::Object(data.clone()))].into_iter())
173+
.map_err(|err| {
174+
anyhow!("Could not infer schema for this event due to err {:?}", err)
175+
})?;
175176
let new_infer_schema = super::update_field_type_in_schema(
176177
Arc::new(infer_schema),
177178
Some(stored_schema),
@@ -200,10 +201,7 @@ impl EventFormat for Event {
200201
.collect()
201202
};
202203

203-
if data
204-
.iter()
205-
.any(|value| fields_mismatch(&schema, value, schema_version))
206-
{
204+
if fields_mismatch(&schema, data, schema_version) {
207205
return Err(anyhow!(
208206
"Could not process this event due to mismatch in datatype"
209207
));
@@ -215,14 +213,14 @@ impl EventFormat for Event {
215213
}
216214

217215
// Convert the Data type (defined above) to arrow record batch
218-
fn decode(data: Self::Data, schema: Arc<Schema>) -> anyhow::Result<RecordBatch> {
216+
fn decode(data: &[Self::Data], schema: Arc<Schema>) -> anyhow::Result<RecordBatch> {
219217
let array_capacity = round_upto_multiple_of_64(data.len());
220218
let mut reader = ReaderBuilder::new(schema)
221219
.with_batch_size(array_capacity)
222220
.with_coerce_primitive(false)
223221
.build_decoder()?;
224222

225-
reader.serialize(&data)?;
223+
reader.serialize(data)?;
226224
match reader.flush() {
227225
Ok(Some(recordbatch)) => Ok(recordbatch),
228226
Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)),
@@ -248,16 +246,18 @@ impl EventFormat for Event {
248246
custom_partitions.as_ref(),
249247
schema_version,
250248
)?;
251-
let (schema, is_first_event) = Self::infer_schema(
252-
&data,
253-
&stored_schema,
254-
time_partition.as_ref(),
255-
static_schema_flag,
256-
schema_version,
257-
)?;
258249

250+
let mut is_first_event = false;
259251
let mut partitions = HashMap::new();
260252
for json in data {
253+
let (schema, is_first) = Self::infer_schema(
254+
&json,
255+
&stored_schema,
256+
time_partition.as_ref(),
257+
static_schema_flag,
258+
schema_version,
259+
)?;
260+
is_first_event = is_first_event || is_first;
261261
let custom_partition_values = match custom_partitions.as_ref() {
262262
Some(custom_partitions) => {
263263
let custom_partitions = custom_partitions.split(',').collect_vec();
@@ -273,7 +273,7 @@ impl EventFormat for Event {
273273

274274
let batch = Self::into_recordbatch(
275275
p_timestamp,
276-
vec![json],
276+
&[json],
277277
&schema,
278278
time_partition.as_ref(),
279279
schema_version,
@@ -368,15 +368,8 @@ fn derive_arrow_schema(
368368

369369
// Returns a list of keys that are present in the given iterable of JSON objects
370370
// Returns None if even one of the value is not an Object
371-
fn collect_keys<'a>(objects: impl Iterator<Item = &'a Json>) -> HashSet<&'a str> {
372-
let mut keys = HashSet::new();
373-
for object in objects {
374-
for key in object.keys() {
375-
keys.insert(key.as_str());
376-
}
377-
}
378-
379-
keys
371+
fn collect_keys(object: &Json) -> HashSet<&str> {
372+
object.keys().map(|k| k.as_str()).collect()
380373
}
381374

382375
// Returns true when the field doesn't exist in schema or has an invalid type
@@ -515,9 +508,9 @@ mod tests {
515508
.to_data(None, None, None, SchemaVersion::V0)
516509
.unwrap();
517510
let (schema, _) =
518-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
511+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
519512
let rb =
520-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
513+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
521514

522515
assert_eq!(rb.num_rows(), 1);
523516
assert_eq!(rb.num_columns(), 4);
@@ -548,9 +541,9 @@ mod tests {
548541
.to_data(None, None, None, SchemaVersion::V0)
549542
.unwrap();
550543
let (schema, _) =
551-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
544+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
552545
let rb =
553-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
546+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
554547

555548
assert_eq!(rb.num_rows(), 1);
556549
assert_eq!(rb.num_columns(), 3);
@@ -583,9 +576,9 @@ mod tests {
583576
.to_data(None, None, None, SchemaVersion::V0)
584577
.unwrap();
585578
let (schema, _) =
586-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
579+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
587580
let rb =
588-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
581+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
589582

590583
assert_eq!(rb.num_rows(), 1);
591584
assert_eq!(rb.num_columns(), 3);
@@ -619,7 +612,9 @@ mod tests {
619612
.to_data(None, None, None, SchemaVersion::V0)
620613
.unwrap();
621614

622-
assert!(Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).is_err());
615+
assert!(
616+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).is_err()
617+
);
623618
}
624619

625620
#[test]
@@ -639,9 +634,9 @@ mod tests {
639634
.to_data(None, None, None, SchemaVersion::V0)
640635
.unwrap();
641636
let (schema, _) =
642-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
637+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
643638
let rb =
644-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
639+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
645640

646641
assert_eq!(rb.num_rows(), 1);
647642
assert_eq!(rb.num_columns(), 1);
@@ -670,9 +665,9 @@ mod tests {
670665
.to_data(None, None, None, SchemaVersion::V0)
671666
.unwrap();
672667
let (schema, _) =
673-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
668+
Event::infer_schema(&data[1], &store_schema, None, false, SchemaVersion::V0).unwrap();
674669
let rb =
675-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
670+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
676671

677672
assert_eq!(rb.num_rows(), 3);
678673
assert_eq!(rb.num_columns(), 4);
@@ -723,9 +718,9 @@ mod tests {
723718
.to_data(None, None, None, SchemaVersion::V0)
724719
.unwrap();
725720
let (schema, _) =
726-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
721+
Event::infer_schema(&data[1], &store_schema, None, false, SchemaVersion::V0).unwrap();
727722
let rb =
728-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
723+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
729724

730725
assert_eq!(rb.num_rows(), 3);
731726
assert_eq!(rb.num_columns(), 4);
@@ -775,9 +770,9 @@ mod tests {
775770
.to_data(None, None, None, SchemaVersion::V0)
776771
.unwrap();
777772
let (schema, _) =
778-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
773+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
779774
let rb =
780-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
775+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
781776

782777
assert_eq!(rb.num_rows(), 3);
783778
assert_eq!(rb.num_columns(), 4);
@@ -797,23 +792,12 @@ mod tests {
797792

798793
#[test]
799794
fn arr_schema_mismatch() {
800-
let json = json!([
801-
{
802-
"a": null,
803-
"b": "hello",
804-
"c": 1.24
805-
},
806-
{
807-
"a": 1,
808-
"b": "hello",
809-
"c": 1
810-
},
811-
{
812-
"a": 1,
813-
"b": "hello",
814-
"c": null
815-
},
816-
]);
795+
let json = json!(
796+
{
797+
"a": 1,
798+
"b": "hello",
799+
"c": 1
800+
});
817801

818802
let store_schema = fields_to_map(
819803
[
@@ -824,11 +808,14 @@ mod tests {
824808
.into_iter(),
825809
);
826810

827-
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
828-
.to_data(None, None, None, SchemaVersion::V0)
829-
.unwrap();
830-
831-
assert!(Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).is_err());
811+
assert!(Event::infer_schema(
812+
json.as_object().unwrap(),
813+
&store_schema,
814+
None,
815+
false,
816+
SchemaVersion::V0
817+
)
818+
.is_err());
832819
}
833820

834821
#[test]
@@ -860,9 +847,9 @@ mod tests {
860847
.to_data(None, None, None, SchemaVersion::V0)
861848
.unwrap();
862849
let (schema, _) =
863-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
850+
Event::infer_schema(&data[3], &store_schema, None, false, SchemaVersion::V0).unwrap();
864851
let rb =
865-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
852+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
866853

867854
assert_eq!(rb.num_rows(), 4);
868855
assert_eq!(rb.num_columns(), 5);
@@ -938,9 +925,9 @@ mod tests {
938925
.to_data(None, None, None, SchemaVersion::V1)
939926
.unwrap();
940927
let (schema, _) =
941-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V1).unwrap();
928+
Event::infer_schema(&data[3], &store_schema, None, false, SchemaVersion::V1).unwrap();
942929
let rb =
943-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V1).unwrap();
930+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V1).unwrap();
944931

945932
assert_eq!(rb.num_rows(), 4);
946933
assert_eq!(rb.num_columns(), 5);

src/event/format/mod.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub trait EventFormat: Sized {
109109
time_partition_limit: Option<NonZeroU32>,
110110
custom_partitions: Option<&String>,
111111
schema_version: SchemaVersion,
112-
) -> anyhow::Result<Self::Data>;
112+
) -> anyhow::Result<Vec<Self::Data>>;
113113

114114
fn infer_schema(
115115
data: &Self::Data,
@@ -119,7 +119,7 @@ pub trait EventFormat: Sized {
119119
schema_version: SchemaVersion,
120120
) -> anyhow::Result<(EventSchema, IsFirstEvent)>;
121121

122-
fn decode(data: Self::Data, schema: Arc<Schema>) -> anyhow::Result<RecordBatch>;
122+
fn decode(data: &[Self::Data], schema: Arc<Schema>) -> anyhow::Result<RecordBatch>;
123123

124124
/// Updates inferred schema with `p_timestamp` field and ensures it adheres to expectations
125125
fn prepare_and_validate_schema(
@@ -156,7 +156,7 @@ pub trait EventFormat: Sized {
156156

157157
fn into_recordbatch(
158158
p_timestamp: DateTime<Utc>,
159-
data: Self::Data,
159+
data: &[Self::Data],
160160
schema: &EventSchema,
161161
time_partition: Option<&String>,
162162
schema_version: SchemaVersion,
@@ -234,7 +234,7 @@ pub fn update_field_type_in_schema(
234234
inferred_schema: Arc<Schema>,
235235
existing_schema: Option<&HashMap<String, Arc<Field>>>,
236236
time_partition: Option<&String>,
237-
log_records: Option<&[Json]>,
237+
log_records: Option<&Json>,
238238
schema_version: SchemaVersion,
239239
) -> Arc<Schema> {
240240
let mut updated_schema = inferred_schema.clone();
@@ -245,11 +245,9 @@ pub fn update_field_type_in_schema(
245245
updated_schema = override_existing_timestamp_fields(existing_schema, updated_schema);
246246
}
247247

248-
if let Some(log_records) = log_records {
249-
for log_record in log_records {
250-
updated_schema =
251-
override_data_type(updated_schema.clone(), log_record.clone(), schema_version);
252-
}
248+
if let Some(log_record) = log_records {
249+
updated_schema =
250+
override_data_type(updated_schema.clone(), log_record.clone(), schema_version);
253251
}
254252

255253
let Some(time_partition) = time_partition else {

0 commit comments

Comments
 (0)