diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 25558a46d..484f773bc 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -47,7 +47,7 @@ use crate::{ }, metadata::{LogStreamMetadata, SchemaVersion}, option::Mode, - static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, + static_schema::StaticSchema, storage::{ object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider, ObjectStoreFormat, Owner, Permisssion, StreamType, @@ -462,13 +462,35 @@ impl Parseable { validate_custom_partition(custom_partition)?; } - let schema = validate_static_schema( - body, - stream_name, - &time_partition, - custom_partition.as_ref(), - static_schema_flag, - )?; + if !time_partition.is_empty() && custom_partition.is_some() { + return Err(StreamError::Custom { + msg: "Cannot set both time partition and custom partition".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + let schema = if static_schema_flag { + if body.is_empty() { + return Err(CreateStreamError::Custom { + msg: format!( + "Please provide schema in the request body for static schema logstream {stream_name}" + ), + status: StatusCode::BAD_REQUEST, + }.into()); + } + + let static_schema: StaticSchema = serde_json::from_slice(body)?; + static_schema + .convert_to_arrow_schema(&time_partition, custom_partition.as_ref()) + .map_err(|err| CreateStreamError::Custom { + msg: format!( + "Unable to commit static schema, logstream {stream_name} not created; Error: {err}" + ), + status: StatusCode::BAD_REQUEST, + })? + } else { + Arc::new(Schema::empty()) + }; self.create_stream( stream_name.to_string(), @@ -761,37 +783,6 @@ impl Parseable { } } -pub fn validate_static_schema( - body: &Bytes, - stream_name: &str, - time_partition: &str, - custom_partition: Option<&String>, - static_schema_flag: bool, -) -> Result, CreateStreamError> { - if !static_schema_flag { - return Ok(Arc::new(Schema::empty())); - } - - if body.is_empty() { - return Err(CreateStreamError::Custom { - msg: format!( - "Please provide schema in the request body for static schema logstream {stream_name}" - ), - status: StatusCode::BAD_REQUEST, - }); - } - - let static_schema: StaticSchema = serde_json::from_slice(body)?; - let parsed_schema = - convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition) - .map_err(|_| CreateStreamError::Custom { - msg: format!("Unable to commit static schema, logstream {stream_name} not created"), - status: StatusCode::BAD_REQUEST, - })?; - - Ok(parsed_schema) -} - pub fn validate_time_partition_limit( time_partition_limit: &str, ) -> Result { diff --git a/src/static_schema.rs b/src/static_schema.rs index 286ec65ad..bb7fe715a 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -16,184 +16,17 @@ * */ -use crate::event::DEFAULT_TIMESTAMP_KEY; -use crate::utils::arrow::get_field; -use serde::{Deserialize, Serialize}; -use std::str; +use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct StaticSchema { - fields: Vec, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct SchemaFields { - name: String, - data_type: String, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ParsedSchema { - pub fields: Vec, - pub metadata: HashMap, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Fields { - name: String, - data_type: DataType, - nullable: bool, - dict_id: i64, - dict_is_ordered: bool, - metadata: HashMap, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct Metadata {} -pub fn convert_static_schema_to_arrow_schema( - static_schema: StaticSchema, - time_partition: &str, - custom_partition: Option<&String>, -) -> Result, StaticSchemaError> { - let mut parsed_schema = ParsedSchema { - fields: Vec::new(), - metadata: HashMap::new(), - }; - let mut time_partition_exists = false; - - if let Some(custom_partition) = custom_partition { - let custom_partition_list = custom_partition.split(',').collect::>(); - let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len()); - - for partition in &custom_partition_list { - if static_schema - .fields - .iter() - .any(|field| &field.name == partition) - { - custom_partition_exists.insert(partition.to_string(), true); - } - } - - for partition in &custom_partition_list { - if !custom_partition_exists.contains_key(*partition) { - return Err(StaticSchemaError::MissingCustomPartition( - partition.to_string(), - )); - } - } - } - - let mut existing_field_names: HashSet = HashSet::new(); - - for mut field in static_schema.fields { - validate_field_names(&field.name, &mut existing_field_names)?; - if !time_partition.is_empty() && field.name == time_partition { - time_partition_exists = true; - field.data_type = "datetime".to_string(); - } - - let parsed_field = Fields { - name: field.name.clone(), - - data_type: { - match field.data_type.as_str() { - "int" => DataType::Int64, - "double" | "float" => DataType::Float64, - "boolean" => DataType::Boolean, - "string" => DataType::Utf8, - "datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), - "string_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) - } - "int_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) - } - "double_list" | "float_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) - } - "boolean_list" => { - DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) - } - _ => DataType::Null, - } - }, - nullable: default_nullable(), - dict_id: default_dict_id(), - dict_is_ordered: default_dict_is_ordered(), - metadata: HashMap::new(), - }; - - parsed_schema.fields.push(parsed_field); - } - if !time_partition.is_empty() && !time_partition_exists { - return Err(StaticSchemaError::MissingTimePartition( - time_partition.to_string(), - )); - } - add_parseable_fields_to_static_schema(parsed_schema) -} - -fn add_parseable_fields_to_static_schema( - parsed_schema: ParsedSchema, -) -> Result, StaticSchemaError> { - let mut schema: Vec> = Vec::new(); - for field in parsed_schema.fields.iter() { - let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable); - schema.push(Arc::new(field)); - } - - if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { - return Err(StaticSchemaError::ReservedKey(DEFAULT_TIMESTAMP_KEY)); - }; - - // add the p_timestamp field to the event schema to the 0th index - schema.insert( - 0, - Arc::new(Field::new( - DEFAULT_TIMESTAMP_KEY, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )), - ); - - // prepare the record batch and new fields to be added - let schema = Arc::new(Schema::new(schema)); - Ok(schema) -} - -fn default_nullable() -> bool { - true -} -fn default_dict_id() -> i64 { - 0 -} -fn default_dict_is_ordered() -> bool { - false -} +use serde::{Deserialize, Serialize}; -fn validate_field_names( - field_name: &str, - existing_fields: &mut HashSet, -) -> Result<(), StaticSchemaError> { - if field_name.is_empty() { - return Err(StaticSchemaError::EmptyFieldName); - } +use crate::event::DEFAULT_TIMESTAMP_KEY; - if !existing_fields.insert(field_name.to_string()) { - return Err(StaticSchemaError::DuplicateField(field_name.to_string())); - } +const DEFAULT_NULLABLE: bool = true; - Ok(()) -} +type FieldName = String; +type FieldType = String; #[derive(Debug, thiserror::Error)] pub enum StaticSchemaError { @@ -213,23 +46,102 @@ pub enum StaticSchemaError { #[error("field name cannot be empty")] EmptyFieldName, - #[error("duplicate field name: {0}")] + #[error("field {0:?} is duplicated")] DuplicateField(String), } -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn empty_field_names() { - let mut existing_field_names: HashSet = HashSet::new(); - assert!(validate_field_names("", &mut existing_field_names).is_err()); - } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +struct SchemaFields { + name: FieldName, + data_type: FieldType, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct StaticSchema { + fields: Vec, +} + +impl StaticSchema { + pub fn convert_to_arrow_schema( + self, + time_partition: &str, + custom_partition: Option<&String>, + ) -> Result, StaticSchemaError> { + let mut schema: Vec> = Vec::new(); + + // Convert to hashmap for easy access and operation + let mut fields: HashMap = HashMap::new(); + + for field in self.fields { + if field.name.is_empty() { + return Err(StaticSchemaError::EmptyFieldName); + } + + if fields.contains_key(&field.name) { + return Err(StaticSchemaError::DuplicateField(field.name.to_owned())); + } + + fields.insert(field.name, field.data_type); + } + + // Ensures all custom partitions are present in schema + if let Some(custom_partition) = custom_partition { + for partition in custom_partition.split(',') { + if !fields.contains_key(partition) { + return Err(StaticSchemaError::MissingCustomPartition( + partition.to_owned(), + )); + } + } + } + + // Ensures default timestamp is not mentioned(as it is only inserted by parseable) + if fields.contains_key(DEFAULT_TIMESTAMP_KEY) { + return Err(StaticSchemaError::ReservedKey(DEFAULT_TIMESTAMP_KEY)); + } + + // If time partitioning is enabled, mutate the datatype to be datetime + if !time_partition.is_empty() { + let Some(field_type) = fields.get_mut(time_partition) else { + return Err(StaticSchemaError::MissingTimePartition( + time_partition.to_owned(), + )); + }; + *field_type = "datetime".to_string(); + } + + for (field_name, field_type) in fields { + let data_type = match field_type.as_str() { + "int" => DataType::Int64, + "double" | "float" => DataType::Float64, + "boolean" => DataType::Boolean, + "string" => DataType::Utf8, + "datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), + "string_list" => DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), + "int_list" => DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + "double_list" | "float_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) + } + "boolean_list" => { + DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) + } + _ => DataType::Null, + }; + let field = Field::new(&field_name, data_type, DEFAULT_NULLABLE); + schema.push(Arc::new(field)); + } - #[test] - fn duplicate_field_names() { - let mut existing_field_names: HashSet = HashSet::new(); - let _ = validate_field_names("test_field", &mut existing_field_names); - assert!(validate_field_names("test_field", &mut existing_field_names).is_err()); + // add the p_timestamp field to the event schema to the 0th index + schema.insert( + 0, + Arc::new(Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )), + ); + + // prepare the record batch and new fields to be added + Ok(Arc::new(Schema::new(schema))) } }