Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplification of static schema handling #1192

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
refactor: simplify flow with a map
de-sh committed Feb 18, 2025
commit 71202efc5691ee69804db7e7ac325303b95afd27
42 changes: 25 additions & 17 deletions src/static_schema.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
*
*/

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema, TimeUnit};
use serde::{Deserialize, Serialize};
@@ -60,26 +60,38 @@ impl StaticSchema {
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, StaticSchemaError> {
let mut schema: Vec<Arc<Field>> = Vec::new();
let mut time_partition_exists = false;

// Convert to hashmap for easy access and operation
let mut fields: HashMap<FieldName, FieldType> = self
.fields
.into_iter()
.map(|field| (field.name, field.data_type))
.collect();

// Ensures all custom partitions are present in schema
if let Some(custom_partition) = custom_partition {
for partition in custom_partition.split(',') {
if !self.fields.iter().any(|field| field.name == partition) {
if !fields.contains_key(partition) {
return Err(StaticSchemaError::MissingCustom(partition.to_owned()));
}
}
}
for mut field in self.fields {
if field.name == DEFAULT_TIMESTAMP_KEY {
return Err(StaticSchemaError::DefaultTime);
}

if !time_partition.is_empty() && field.name == time_partition {
time_partition_exists = true;
field.data_type = "datetime".to_string();
}
// Ensures default timestamp is not mentioned(as it is only inserted by parseable)
if fields.contains_key(DEFAULT_TIMESTAMP_KEY) {
return Err(StaticSchemaError::DefaultTime);
}

let data_type = match field.data_type.as_str() {
// If time partitioning is enabled, mutate the datatype to be datetime
if !time_partition.is_empty() {
let Some(field_type) = fields.get_mut(time_partition) else {
return Err(StaticSchemaError::MissingTime(time_partition.to_owned()));
};
*field_type = "datetime".to_string();
}

for (field_name, field_type) in fields {
let data_type = match field_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
@@ -95,14 +107,10 @@ impl StaticSchema {
}
_ => DataType::Null,
};
let field = Field::new(&field.name, data_type, DEFAULT_NULLABLE);
let field = Field::new(&field_name, data_type, DEFAULT_NULLABLE);
schema.push(Arc::new(field));
}

if !time_partition.is_empty() && !time_partition_exists {
return Err(StaticSchemaError::MissingTime(time_partition.to_owned()));
}

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,