Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplification of static schema handling #1192

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
62 changes: 23 additions & 39 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::{
},
metadata::{LogStreamMetadata, SchemaVersion},
option::Mode,
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
static_schema::StaticSchema,
storage::{
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
ObjectStoreFormat, Owner, Permisssion, StreamType,
Expand Down Expand Up @@ -485,13 +485,28 @@ impl Parseable {
}
}

let schema = validate_static_schema(
body,
stream_name,
&time_partition,
custom_partition.as_ref(),
static_schema_flag,
)?;
let schema = if static_schema_flag {
if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
}.into());
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
static_schema
.convert_to_arrow_schema(&time_partition, custom_partition.as_ref())
.map_err(|_| CreateStreamError::Custom {
msg: format!(
"Unable to commit static schema, logstream {stream_name} not created"
),
status: StatusCode::BAD_REQUEST,
})?
} else {
Arc::new(Schema::empty())
};

self.create_stream(
stream_name.to_string(),
Expand Down Expand Up @@ -778,37 +793,6 @@ impl Parseable {
}
}

pub fn validate_static_schema(
body: &Bytes,
stream_name: &str,
time_partition: &str,
custom_partition: Option<&String>,
static_schema_flag: bool,
) -> Result<Arc<Schema>, CreateStreamError> {
if !static_schema_flag {
return Ok(Arc::new(Schema::empty()));
}

if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
let parsed_schema =
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
.map_err(|_| CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
})?;

Ok(parsed_schema)
}

pub fn validate_time_partition_limit(
time_partition_limit: &str,
) -> Result<NonZeroU32, CreateStreamError> {
Expand Down
205 changes: 88 additions & 117 deletions src/static_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,100 @@ pub struct StaticSchema {
fields: Vec<SchemaFields>,
}

impl StaticSchema {
pub fn convert_to_arrow_schema(
self,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, AnyError> {
let mut fields = Vec::new();
let mut time_partition_exists = false;

if let Some(custom_partition) = custom_partition {
for partition in custom_partition.split(',') {
if !self.fields.iter().any(|field| field.name == partition) {
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
}
}
}
for mut field in self.fields {
if !time_partition.is_empty() && field.name == time_partition {
time_partition_exists = true;
field.data_type = "datetime".to_string();
}

let parsed_field = Fields {
name: field.name.clone(),

data_type: {
match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
}
"int_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
}
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
}
},
nullable: default_nullable(),
dict_id: default_dict_id(),
dict_is_ordered: default_dict_is_ordered(),
metadata: HashMap::new(),
};

fields.push(parsed_field);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Return an explicit error for unrecognized data types instead of defaulting to null.

Falling back to DataType::Null for unknown types can silently mask typos or misunderstandings in the schema. Consider returning an error to highlight the invalid type rather than converting it to null.

- _ => DataType::Null,
+ other => {
+     return Err(anyhow!("Unsupported or unrecognized data type: {other}"));
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let parsed_field = Fields {
name: field.name.clone(),
data_type: {
match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
}
"int_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
}
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
}
},
nullable: default_nullable(),
dict_id: default_dict_id(),
dict_is_ordered: default_dict_is_ordered(),
metadata: HashMap::new(),
};
fields.push(parsed_field);
}
let parsed_field = Fields {
name: field.name.clone(),
data_type: {
match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
}
"int_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
}
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
other => {
return Err(anyhow!("Unsupported or unrecognized data type: {other}"));
}
}
},
nullable: default_nullable(),
dict_id: default_dict_id(),
dict_is_ordered: default_dict_is_ordered(),
metadata: HashMap::new(),
};
fields.push(parsed_field);


if !time_partition.is_empty() && !time_partition_exists {
return Err(anyhow!("time partition field {time_partition} does not exist in the schema for the static schema logstream"));
}

let mut schema: Vec<Arc<Field>> = Vec::new();
for field in fields {
let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable);
schema.push(Arc::new(field));
}

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
));
};

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
Ok(Arc::new(Schema::new(schema)))
}
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaFields {
name: String,
data_type: String,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParsedSchema {
pub fields: Vec<Fields>,
pub metadata: HashMap<String, String>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Fields {
Expand All @@ -56,116 +137,6 @@ pub struct Fields {
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]

pub struct Metadata {}
pub fn convert_static_schema_to_arrow_schema(
static_schema: StaticSchema,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, AnyError> {
let mut parsed_schema = ParsedSchema {
fields: Vec::new(),
metadata: HashMap::new(),
};
let mut time_partition_exists = false;

if let Some(custom_partition) = custom_partition {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len());

for partition in &custom_partition_list {
if static_schema
.fields
.iter()
.any(|field| &field.name == partition)
{
custom_partition_exists.insert(partition.to_string(), true);
}
}

for partition in &custom_partition_list {
if !custom_partition_exists.contains_key(*partition) {
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
}
}
}
for mut field in static_schema.fields {
if !time_partition.is_empty() && field.name == time_partition {
time_partition_exists = true;
field.data_type = "datetime".to_string();
}

let parsed_field = Fields {
name: field.name.clone(),

data_type: {
match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
}
"int_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
}
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
}
},
nullable: default_nullable(),
dict_id: default_dict_id(),
dict_is_ordered: default_dict_is_ordered(),
metadata: HashMap::new(),
};

parsed_schema.fields.push(parsed_field);
}
if !time_partition.is_empty() && !time_partition_exists {
return Err(anyhow! {
format!(
"time partition field {time_partition} does not exist in the schema for the static schema logstream"
),
});
}
add_parseable_fields_to_static_schema(parsed_schema)
}

fn add_parseable_fields_to_static_schema(
parsed_schema: ParsedSchema,
) -> Result<Arc<Schema>, AnyError> {
let mut schema: Vec<Arc<Field>> = Vec::new();
for field in parsed_schema.fields.iter() {
let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable);
schema.push(Arc::new(field));
}

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
));
};

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
let schema = Arc::new(Schema::new(schema));
Ok(schema)
}

fn default_nullable() -> bool {
true
Expand Down
Loading