From 156174247e8e5086356a88bedce296193bff0ae6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 16:35:36 +0530 Subject: [PATCH 1/8] refactor: simplification of static schema handling --- src/parseable/mod.rs | 62 +++++-------- src/static_schema.rs | 205 +++++++++++++++++++------------------------ 2 files changed, 111 insertions(+), 156 deletions(-) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 2fba1d5b5..d59669db0 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -47,7 +47,7 @@ use crate::{ }, metadata::{LogStreamMetadata, SchemaVersion}, option::Mode, - static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, + static_schema::StaticSchema, storage::{ object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider, ObjectStoreFormat, Owner, Permisssion, StreamType, @@ -485,13 +485,28 @@ impl Parseable { } } - let schema = validate_static_schema( - body, - stream_name, - &time_partition, - custom_partition.as_ref(), - static_schema_flag, - )?; + let schema = if static_schema_flag { + if body.is_empty() { + return Err(CreateStreamError::Custom { + msg: format!( + "Please provide schema in the request body for static schema logstream {stream_name}" + ), + status: StatusCode::BAD_REQUEST, + }.into()); + } + + let static_schema: StaticSchema = serde_json::from_slice(body)?; + static_schema + .convert_to_arrow_schema(&time_partition, custom_partition.as_ref()) + .map_err(|_| CreateStreamError::Custom { + msg: format!( + "Unable to commit static schema, logstream {stream_name} not created" + ), + status: StatusCode::BAD_REQUEST, + })? + } else { + Arc::new(Schema::empty()) + }; self.create_stream( stream_name.to_string(), @@ -778,37 +793,6 @@ impl Parseable { } } -pub fn validate_static_schema( - body: &Bytes, - stream_name: &str, - time_partition: &str, - custom_partition: Option<&String>, - static_schema_flag: bool, -) -> Result, CreateStreamError> { - if !static_schema_flag { - return Ok(Arc::new(Schema::empty())); - } - - if body.is_empty() { - return Err(CreateStreamError::Custom { - msg: format!( - "Please provide schema in the request body for static schema logstream {stream_name}" - ), - status: StatusCode::BAD_REQUEST, - }); - } - - let static_schema: StaticSchema = serde_json::from_slice(body)?; - let parsed_schema = - convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition) - .map_err(|_| CreateStreamError::Custom { - msg: format!("Unable to commit static schema, logstream {stream_name} not created"), - status: StatusCode::BAD_REQUEST, - })?; - - Ok(parsed_schema) -} - pub fn validate_time_partition_limit( time_partition_limit: &str, ) -> Result { diff --git a/src/static_schema.rs b/src/static_schema.rs index 5b1a5cada..a9e4680e6 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -29,19 +29,100 @@ pub struct StaticSchema { fields: Vec, } +impl StaticSchema { + pub fn convert_to_arrow_schema( + self, + time_partition: &str, + custom_partition: Option<&String>, + ) -> Result, AnyError> { + let mut fields = Vec::new(); + let mut time_partition_exists = false; + + if let Some(custom_partition) = custom_partition { + for partition in custom_partition.split(',') { + if !self.fields.iter().any(|field| field.name == partition) { + return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); + } + } + } + for mut field in self.fields { + if !time_partition.is_empty() && field.name == time_partition { + time_partition_exists = true; + field.data_type = "datetime".to_string(); + } + + let parsed_field = Fields { + name: field.name.clone(), + + data_type: { + match field.data_type.as_str() { + "int" => DataType::Int64, + "double" | "float" => DataType::Float64, + "boolean" => DataType::Boolean, + "string" => DataType::Utf8, + "datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), + "string_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) + } + "int_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) + } + "double_list" | "float_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) + } + "boolean_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) + } + _ => DataType::Null, + } + }, + nullable: default_nullable(), + dict_id: default_dict_id(), + dict_is_ordered: default_dict_is_ordered(), + metadata: HashMap::new(), + }; + + fields.push(parsed_field); + } + + if !time_partition.is_empty() && !time_partition_exists { + return Err(anyhow!("time partition field {time_partition} does not exist in the schema for the static schema logstream")); + } + + let mut schema: Vec> = Vec::new(); + for field in fields { + let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); + schema.push(Arc::new(field)); + } + + if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { + return Err(anyhow!( + "field {} is a reserved field", + DEFAULT_TIMESTAMP_KEY + )); + }; + + // add the p_timestamp field to the event schema to the 0th index + schema.insert( + 0, + Arc::new(Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )), + ); + + // prepare the record batch and new fields to be added + Ok(Arc::new(Schema::new(schema))) + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct SchemaFields { name: String, data_type: String, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ParsedSchema { - pub fields: Vec, - pub metadata: HashMap, -} - #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Fields { @@ -56,116 +137,6 @@ pub struct Fields { #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Metadata {} -pub fn convert_static_schema_to_arrow_schema( - static_schema: StaticSchema, - time_partition: &str, - custom_partition: Option<&String>, -) -> Result, AnyError> { - let mut parsed_schema = ParsedSchema { - fields: Vec::new(), - metadata: HashMap::new(), - }; - let mut time_partition_exists = false; - - if let Some(custom_partition) = custom_partition { - let custom_partition_list = custom_partition.split(',').collect::>(); - let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len()); - - for partition in &custom_partition_list { - if static_schema - .fields - .iter() - .any(|field| &field.name == partition) - { - custom_partition_exists.insert(partition.to_string(), true); - } - } - - for partition in &custom_partition_list { - if !custom_partition_exists.contains_key(*partition) { - return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); - } - } - } - for mut field in static_schema.fields { - if !time_partition.is_empty() && field.name == time_partition { - time_partition_exists = true; - field.data_type = "datetime".to_string(); - } - - let parsed_field = Fields { - name: field.name.clone(), - - data_type: { - match field.data_type.as_str() { - "int" => DataType::Int64, - "double" | "float" => DataType::Float64, - "boolean" => DataType::Boolean, - "string" => DataType::Utf8, - "datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), - "string_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) - } - "int_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) - } - "double_list" | "float_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) - } - "boolean_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) - } - _ => DataType::Null, - } - }, - nullable: default_nullable(), - dict_id: default_dict_id(), - dict_is_ordered: default_dict_is_ordered(), - metadata: HashMap::new(), - }; - - parsed_schema.fields.push(parsed_field); - } - if !time_partition.is_empty() && !time_partition_exists { - return Err(anyhow! { - format!( - "time partition field {time_partition} does not exist in the schema for the static schema logstream" - ), - }); - } - add_parseable_fields_to_static_schema(parsed_schema) -} - -fn add_parseable_fields_to_static_schema( - parsed_schema: ParsedSchema, -) -> Result, AnyError> { - let mut schema: Vec> = Vec::new(); - for field in parsed_schema.fields.iter() { - let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); - schema.push(Arc::new(field)); - } - - if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_TIMESTAMP_KEY - )); - }; - - // add the p_timestamp field to the event schema to the 0th index - schema.insert( - 0, - Arc::new(Field::new( - DEFAULT_TIMESTAMP_KEY, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )), - ); - - // prepare the record batch and new fields to be added - let schema = Arc::new(Schema::new(schema)); - Ok(schema) -} fn default_nullable() -> bool { true From dc09121db22e894173defb6cc15c8b21fcc117e1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 11:53:56 +0530 Subject: [PATCH 2/8] refactor: custom error type --- src/parseable/mod.rs | 4 ++-- src/static_schema.rs | 40 +++++++++++++++++++++++++++------------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index d59669db0..7c22158a0 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -498,9 +498,9 @@ impl Parseable { let static_schema: StaticSchema = serde_json::from_slice(body)?; static_schema .convert_to_arrow_schema(&time_partition, custom_partition.as_ref()) - .map_err(|_| CreateStreamError::Custom { + .map_err(|err| CreateStreamError::Custom { msg: format!( - "Unable to commit static schema, logstream {stream_name} not created" + "Unable to commit static schema, logstream {stream_name} not created; Error: {err}" ), status: StatusCode::BAD_REQUEST, })? diff --git a/src/static_schema.rs b/src/static_schema.rs index a9e4680e6..1e8b0e695 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -16,32 +16,47 @@ * */ -use crate::event::DEFAULT_TIMESTAMP_KEY; -use crate::utils::arrow::get_field; -use anyhow::{anyhow, Error as AnyError}; -use serde::{Deserialize, Serialize}; -use std::str; +use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use std::{collections::HashMap, sync::Arc}; +use serde::{Deserialize, Serialize}; + +use crate::{event::DEFAULT_TIMESTAMP_KEY, utils::arrow::get_field}; + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { fields: Vec, } +#[derive(Debug, thiserror::Error)] +pub enum StaticSchemaError { + #[error( + "custom partition field {0} does not exist in the schema for the static schema logstream" + )] + MissingCustomPartition(String), + #[error( + "time partition field {0} does not exist in the schema for the static schema logstream" + )] + MissingTimePartition(String), + #[error("field {DEFAULT_TIMESTAMP_KEY} is a reserved field")] + MissingDefaultTimePartition, +} + impl StaticSchema { pub fn convert_to_arrow_schema( self, time_partition: &str, custom_partition: Option<&String>, - ) -> Result, AnyError> { + ) -> Result, StaticSchemaError> { let mut fields = Vec::new(); let mut time_partition_exists = false; if let Some(custom_partition) = custom_partition { for partition in custom_partition.split(',') { if !self.fields.iter().any(|field| field.name == partition) { - return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); + return Err(StaticSchemaError::MissingCustomPartition( + partition.to_owned(), + )); } } } @@ -86,7 +101,9 @@ impl StaticSchema { } if !time_partition.is_empty() && !time_partition_exists { - return Err(anyhow!("time partition field {time_partition} does not exist in the schema for the static schema logstream")); + return Err(StaticSchemaError::MissingTimePartition( + time_partition.to_owned(), + )); } let mut schema: Vec> = Vec::new(); @@ -96,10 +113,7 @@ impl StaticSchema { } if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_TIMESTAMP_KEY - )); + return Err(StaticSchemaError::MissingDefaultTimePartition); }; // add the p_timestamp field to the event schema to the 0th index From 53ac33f0b4f38db21827a975dcdb26f2bc0eb108 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 12:00:36 +0530 Subject: [PATCH 3/8] style: clippy suggestion --- src/static_schema.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index 1e8b0e695..22adb78ce 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -33,13 +33,13 @@ pub enum StaticSchemaError { #[error( "custom partition field {0} does not exist in the schema for the static schema logstream" )] - MissingCustomPartition(String), + MissingCustom(String), #[error( "time partition field {0} does not exist in the schema for the static schema logstream" )] - MissingTimePartition(String), + MissingTime(String), #[error("field {DEFAULT_TIMESTAMP_KEY} is a reserved field")] - MissingDefaultTimePartition, + DefaultTime, } impl StaticSchema { @@ -54,9 +54,7 @@ impl StaticSchema { if let Some(custom_partition) = custom_partition { for partition in custom_partition.split(',') { if !self.fields.iter().any(|field| field.name == partition) { - return Err(StaticSchemaError::MissingCustomPartition( - partition.to_owned(), - )); + return Err(StaticSchemaError::MissingCustom(partition.to_owned())); } } } @@ -101,9 +99,7 @@ impl StaticSchema { } if !time_partition.is_empty() && !time_partition_exists { - return Err(StaticSchemaError::MissingTimePartition( - time_partition.to_owned(), - )); + return Err(StaticSchemaError::MissingTime(time_partition.to_owned())); } let mut schema: Vec> = Vec::new(); @@ -113,7 +109,7 @@ impl StaticSchema { } if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { - return Err(StaticSchemaError::MissingDefaultTimePartition); + return Err(StaticSchemaError::DefaultTime); }; // add the p_timestamp field to the event schema to the 0th index From f864c0ebca916c184d7cb67950a3593abb01ff7c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 12:02:42 +0530 Subject: [PATCH 4/8] perf iterate less --- src/static_schema.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index 22adb78ce..d6ab26060 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -21,7 +21,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use serde::{Deserialize, Serialize}; -use crate::{event::DEFAULT_TIMESTAMP_KEY, utils::arrow::get_field}; +use crate::event::DEFAULT_TIMESTAMP_KEY; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { @@ -59,6 +59,10 @@ impl StaticSchema { } } for mut field in self.fields { + if field.name == DEFAULT_TIMESTAMP_KEY { + return Err(StaticSchemaError::DefaultTime); + } + if !time_partition.is_empty() && field.name == time_partition { time_partition_exists = true; field.data_type = "datetime".to_string(); @@ -108,10 +112,6 @@ impl StaticSchema { schema.push(Arc::new(field)); } - if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { - return Err(StaticSchemaError::DefaultTime); - }; - // add the p_timestamp field to the event schema to the 0th index schema.insert( 0, From 7616d47e96c81100552ba4a24ac0f5f9718b7acc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 12:05:06 +0530 Subject: [PATCH 5/8] put it in quotes --- src/static_schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index d6ab26060..bbc99ef27 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -38,7 +38,7 @@ pub enum StaticSchemaError { "time partition field {0} does not exist in the schema for the static schema logstream" )] MissingTime(String), - #[error("field {DEFAULT_TIMESTAMP_KEY} is a reserved field")] + #[error("field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field")] DefaultTime, } From 3cee27198c95da8a98229ff8bc245239627ddfb0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 13:12:36 +0530 Subject: [PATCH 6/8] refactor: directly construct `Field` --- src/static_schema.rs | 85 +++++++++++--------------------------------- 1 file changed, 21 insertions(+), 64 deletions(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index bbc99ef27..f7f930ea0 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -16,13 +16,15 @@ * */ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use serde::{Deserialize, Serialize}; use crate::event::DEFAULT_TIMESTAMP_KEY; +const DEFAULT_NULLABLE: bool = true; + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { fields: Vec, @@ -48,7 +50,7 @@ impl StaticSchema { time_partition: &str, custom_partition: Option<&String>, ) -> Result, StaticSchemaError> { - let mut fields = Vec::new(); + let mut schema: Vec> = Vec::new(); let mut time_partition_exists = false; if let Some(custom_partition) = custom_partition { @@ -68,50 +70,30 @@ impl StaticSchema { field.data_type = "datetime".to_string(); } - let parsed_field = Fields { - name: field.name.clone(), - - data_type: { - match field.data_type.as_str() { - "int" => DataType::Int64, - "double" | "float" => DataType::Float64, - "boolean" => DataType::Boolean, - "string" => DataType::Utf8, - "datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), - "string_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) - } - "int_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) - } - "double_list" | "float_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) - } - "boolean_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) - } - _ => DataType::Null, - } - }, - nullable: default_nullable(), - dict_id: default_dict_id(), - dict_is_ordered: default_dict_is_ordered(), - metadata: HashMap::new(), + let data_type = match field.data_type.as_str() { + "int" => DataType::Int64, + "double" | "float" => DataType::Float64, + "boolean" => DataType::Boolean, + "string" => DataType::Utf8, + "datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), + "string_list" => DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + "int_list" => DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + "double_list" | "float_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) + } + "boolean_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) + } + _ => DataType::Null, }; - - fields.push(parsed_field); + let field = Field::new(&field.name, data_type, DEFAULT_NULLABLE); + schema.push(Arc::new(field)); } if !time_partition.is_empty() && !time_partition_exists { return Err(StaticSchemaError::MissingTime(time_partition.to_owned())); } - let mut schema: Vec> = Vec::new(); - for field in fields { - let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); - schema.push(Arc::new(field)); - } - // add the p_timestamp field to the event schema to the 0th index schema.insert( 0, @@ -132,28 +114,3 @@ pub struct SchemaFields { name: String, data_type: String, } - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Fields { - name: String, - data_type: DataType, - nullable: bool, - dict_id: i64, - dict_is_ordered: bool, - metadata: HashMap, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] - -pub struct Metadata {} - -fn default_nullable() -> bool { - true -} -fn default_dict_id() -> i64 { - 0 -} -fn default_dict_is_ordered() -> bool { - false -} From 7f2ee9a4b6045982abd2c274f036d16f6450416c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 13:51:07 +0530 Subject: [PATCH 7/8] style: type names --- src/static_schema.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index f7f930ea0..aeb8c83fb 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -25,10 +25,8 @@ use crate::event::DEFAULT_TIMESTAMP_KEY; const DEFAULT_NULLABLE: bool = true; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct StaticSchema { - fields: Vec, -} +type FieldName = String; +type FieldType = String; #[derive(Debug, thiserror::Error)] pub enum StaticSchemaError { @@ -44,6 +42,17 @@ pub enum StaticSchemaError { DefaultTime, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +struct SchemaFields { + name: FieldName, + data_type: FieldType, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct StaticSchema { + fields: Vec, +} + impl StaticSchema { pub fn convert_to_arrow_schema( self, @@ -108,9 +117,3 @@ impl StaticSchema { Ok(Arc::new(Schema::new(schema))) } } - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct SchemaFields { - name: String, - data_type: String, -} From 71202efc5691ee69804db7e7ac325303b95afd27 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 14:04:34 +0530 Subject: [PATCH 8/8] refactor: simplify flow with a map --- src/static_schema.rs | 42 +++++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/static_schema.rs b/src/static_schema.rs index aeb8c83fb..d4b668250 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -16,7 +16,7 @@ * */ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use serde::{Deserialize, Serialize}; @@ -60,26 +60,38 @@ impl StaticSchema { custom_partition: Option<&String>, ) -> Result, StaticSchemaError> { let mut schema: Vec> = Vec::new(); - let mut time_partition_exists = false; + // Convert to hashmap for easy access and operation + let mut fields: HashMap = self + .fields + .into_iter() + .map(|field| (field.name, field.data_type)) + .collect(); + + // Ensures all custom partitions are present in schema if let Some(custom_partition) = custom_partition { for partition in custom_partition.split(',') { - if !self.fields.iter().any(|field| field.name == partition) { + if !fields.contains_key(partition) { return Err(StaticSchemaError::MissingCustom(partition.to_owned())); } } } - for mut field in self.fields { - if field.name == DEFAULT_TIMESTAMP_KEY { - return Err(StaticSchemaError::DefaultTime); - } - if !time_partition.is_empty() && field.name == time_partition { - time_partition_exists = true; - field.data_type = "datetime".to_string(); - } + // Ensures default timestamp is not mentioned(as it is only inserted by parseable) + if fields.contains_key(DEFAULT_TIMESTAMP_KEY) { + return Err(StaticSchemaError::DefaultTime); + } - let data_type = match field.data_type.as_str() { + // If time partitioning is enabled, mutate the datatype to be datetime + if !time_partition.is_empty() { + let Some(field_type) = fields.get_mut(time_partition) else { + return Err(StaticSchemaError::MissingTime(time_partition.to_owned())); + }; + *field_type = "datetime".to_string(); + } + + for (field_name, field_type) in fields { + let data_type = match field_type.as_str() { "int" => DataType::Int64, "double" | "float" => DataType::Float64, "boolean" => DataType::Boolean, @@ -95,14 +107,10 @@ impl StaticSchema { } _ => DataType::Null, }; - let field = Field::new(&field.name, data_type, DEFAULT_NULLABLE); + let field = Field::new(&field_name, data_type, DEFAULT_NULLABLE); schema.push(Arc::new(field)); } - if !time_partition.is_empty() && !time_partition_exists { - return Err(StaticSchemaError::MissingTime(time_partition.to_owned())); - } - // add the p_timestamp field to the event schema to the 0th index schema.insert( 0,