Skip to content

Commit aa0befa

Browse files
committed
refactor: separate out flatten from schema inference
1 parent 218080e commit aa0befa

File tree

2 files changed

+82
-41
lines changed

2 files changed

+82
-41
lines changed

src/event/format/json.rs

+71-38
Original file line numberDiff line numberDiff line change
@@ -140,37 +140,43 @@ impl EventFormat for Event {
140140
// also extract the arrow schema, tags and metadata from the incoming json
141141
fn to_data(
142142
self,
143-
static_schema_flag: bool,
144-
stored_schema: &HashMap<String, Arc<Field>>,
145143
time_partition: Option<&String>,
146144
time_partition_limit: Option<NonZeroU32>,
147145
custom_partitions: Option<&String>,
148146
schema_version: SchemaVersion,
149-
) -> anyhow::Result<(Self::Data, Vec<Arc<Field>>, bool)> {
150-
let flattened = self.flatten_logs(
147+
) -> anyhow::Result<Self::Data> {
148+
self.flatten_logs(
151149
time_partition,
152150
time_partition_limit,
153151
custom_partitions,
154152
schema_version,
155-
)?;
153+
)
154+
}
156155

156+
fn infer_schema(
157+
data: &Self::Data,
158+
stored_schema: &HashMap<String, Arc<Field>>,
159+
time_partition: Option<&String>,
160+
static_schema_flag: bool,
161+
schema_version: SchemaVersion,
162+
) -> anyhow::Result<(super::EventSchema, bool)> {
157163
// collect all the keys from all the json objects in the request body
158-
let fields = collect_keys(flattened.iter());
164+
let fields = collect_keys(data.iter());
159165

160166
let mut is_first = false;
161167
let schema = if let Some(schema) = derive_arrow_schema(stored_schema, fields) {
162168
schema
163169
} else {
164170
// TODO:
165171
let mut infer_schema = infer_json_schema_from_iterator(
166-
flattened.iter().map(|obj| Ok(Value::Object(obj.clone()))),
172+
data.iter().map(|obj| Ok(Value::Object(obj.clone()))),
167173
)
168174
.map_err(|err| anyhow!("Could not infer schema for this event due to err {:?}", err))?;
169175
let new_infer_schema = super::update_field_type_in_schema(
170176
Arc::new(infer_schema),
171177
Some(stored_schema),
172178
time_partition,
173-
Some(&flattened),
179+
Some(data),
174180
schema_version,
175181
);
176182
infer_schema = Schema::new(new_infer_schema.fields().clone());
@@ -194,7 +200,7 @@ impl EventFormat for Event {
194200
.collect()
195201
};
196202

197-
if flattened
203+
if data
198204
.iter()
199205
.any(|value| fields_mismatch(&schema, value, schema_version))
200206
{
@@ -205,7 +211,7 @@ impl EventFormat for Event {
205211

206212
let schema = Self::prepare_and_validate_schema(schema, stored_schema, static_schema_flag)?;
207213

208-
Ok((flattened, schema, is_first))
214+
Ok((schema, is_first))
209215
}
210216

211217
// Convert the Data type (defined above) to arrow record batch
@@ -231,19 +237,24 @@ impl EventFormat for Event {
231237
let static_schema_flag = stream.get_static_schema_flag();
232238
let custom_partitions = stream.get_custom_partition();
233239
let schema_version = stream.get_schema_version();
234-
let storage_schema = stream.get_schema_raw();
240+
let stored_schema = stream.get_schema_raw();
235241
let stream_type = stream.get_stream_type();
236242

237243
let p_timestamp = self.p_timestamp;
238244
let origin_size = self.origin_size;
239-
let (data, schema, is_first_event) = self.to_data(
240-
static_schema_flag,
241-
&storage_schema,
245+
let data = self.to_data(
242246
time_partition.as_ref(),
243247
time_partition_limit,
244248
custom_partitions.as_ref(),
245249
schema_version,
246250
)?;
251+
let (schema, is_first_event) = Self::infer_schema(
252+
&data,
253+
&stored_schema,
254+
time_partition.as_ref(),
255+
static_schema_flag,
256+
schema_version,
257+
)?;
247258

248259
let mut partitions = HashMap::new();
249260
for json in data {
@@ -500,9 +511,11 @@ mod tests {
500511
});
501512

502513
let store_schema = HashMap::default();
503-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
504-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
514+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
515+
.to_data(None, None, None, SchemaVersion::V0)
505516
.unwrap();
517+
let (schema, _) =
518+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
506519
let rb =
507520
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
508521

@@ -531,9 +544,11 @@ mod tests {
531544
});
532545

533546
let store_schema = HashMap::default();
534-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
535-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
547+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
548+
.to_data(None, None, None, SchemaVersion::V0)
536549
.unwrap();
550+
let (schema, _) =
551+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
537552
let rb =
538553
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
539554

@@ -564,9 +579,11 @@ mod tests {
564579
]
565580
.into_iter(),
566581
);
567-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
568-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
582+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
583+
.to_data(None, None, None, SchemaVersion::V0)
569584
.unwrap();
585+
let (schema, _) =
586+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
570587
let rb =
571588
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
572589

@@ -598,9 +615,11 @@ mod tests {
598615
.into_iter(),
599616
);
600617

601-
assert!(Event::new(json, 0 /* doesn't matter */, LogSource::Json)
602-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0,)
603-
.is_err());
618+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
619+
.to_data(None, None, None, SchemaVersion::V0)
620+
.unwrap();
621+
622+
assert!(Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).is_err());
604623
}
605624

606625
#[test]
@@ -616,9 +635,11 @@ mod tests {
616635
.into_iter(),
617636
);
618637

619-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
620-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
638+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
639+
.to_data(None, None, None, SchemaVersion::V0)
621640
.unwrap();
641+
let (schema, _) =
642+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
622643
let rb =
623644
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
624645

@@ -645,9 +666,11 @@ mod tests {
645666
]);
646667

647668
let store_schema = HashMap::new();
648-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
649-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
669+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
670+
.to_data(None, None, None, SchemaVersion::V0)
650671
.unwrap();
672+
let (schema, _) =
673+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
651674
let rb =
652675
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
653676

@@ -696,9 +719,11 @@ mod tests {
696719
]);
697720

698721
let store_schema = HashMap::new();
699-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
700-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
722+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
723+
.to_data(None, None, None, SchemaVersion::V0)
701724
.unwrap();
725+
let (schema, _) =
726+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
702727
let rb =
703728
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
704729

@@ -746,9 +771,11 @@ mod tests {
746771
]
747772
.into_iter(),
748773
);
749-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
750-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
774+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
775+
.to_data(None, None, None, SchemaVersion::V0)
751776
.unwrap();
777+
let (schema, _) =
778+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
752779
let rb =
753780
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
754781

@@ -797,9 +824,11 @@ mod tests {
797824
.into_iter(),
798825
);
799826

800-
assert!(Event::new(json, 0 /* doesn't matter */, LogSource::Json)
801-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0,)
802-
.is_err());
827+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
828+
.to_data(None, None, None, SchemaVersion::V0)
829+
.unwrap();
830+
831+
assert!(Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).is_err());
803832
}
804833

805834
#[test]
@@ -827,9 +856,11 @@ mod tests {
827856
]);
828857

829858
let store_schema = HashMap::new();
830-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
831-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V0)
859+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
860+
.to_data(None, None, None, SchemaVersion::V0)
832861
.unwrap();
862+
let (schema, _) =
863+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
833864
let rb =
834865
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
835866

@@ -903,9 +934,11 @@ mod tests {
903934
]);
904935

905936
let store_schema = HashMap::new();
906-
let (data, schema, _) = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
907-
.to_data(false, &store_schema, None, None, None, SchemaVersion::V1)
937+
let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json)
938+
.to_data(None, None, None, SchemaVersion::V1)
908939
.unwrap();
940+
let (schema, _) =
941+
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V1).unwrap();
909942
let rb =
910943
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V1).unwrap();
911944

src/event/format/mod.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -96,20 +96,28 @@ impl Display for LogSource {
9696
}
9797
}
9898

99+
pub type IsFirstEvent = bool;
100+
99101
// Global Trait for event format
100102
// This trait is implemented by all the event formats
101103
pub trait EventFormat: Sized {
102104
type Data;
103105

104106
fn to_data(
105107
self,
106-
static_schema_flag: bool,
107-
stored_schema: &HashMap<String, Arc<Field>>,
108108
time_partition: Option<&String>,
109109
time_partition_limit: Option<NonZeroU32>,
110110
custom_partitions: Option<&String>,
111111
schema_version: SchemaVersion,
112-
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
112+
) -> anyhow::Result<Self::Data>;
113+
114+
fn infer_schema(
115+
data: &Self::Data,
116+
stored_schema: &HashMap<String, Arc<Field>>,
117+
time_partition: Option<&String>,
118+
static_schema_flag: bool,
119+
schema_version: SchemaVersion,
120+
) -> anyhow::Result<(EventSchema, IsFirstEvent)>;
113121

114122
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
115123

0 commit comments

Comments
 (0)