Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplification of static schema handling #1192

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
62 changes: 23 additions & 39 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::{
},
metadata::{LogStreamMetadata, SchemaVersion},
option::Mode,
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
static_schema::StaticSchema,
storage::{
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
ObjectStoreFormat, Owner, Permisssion, StreamType,
Expand Down Expand Up @@ -485,13 +485,28 @@ impl Parseable {
}
}

let schema = validate_static_schema(
body,
stream_name,
&time_partition,
custom_partition.as_ref(),
static_schema_flag,
)?;
let schema = if static_schema_flag {
if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
}.into());
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
static_schema
.convert_to_arrow_schema(&time_partition, custom_partition.as_ref())
.map_err(|err| CreateStreamError::Custom {
msg: format!(
"Unable to commit static schema, logstream {stream_name} not created; Error: {err}"
),
status: StatusCode::BAD_REQUEST,
})?
} else {
Arc::new(Schema::empty())
};

self.create_stream(
stream_name.to_string(),
Expand Down Expand Up @@ -778,37 +793,6 @@ impl Parseable {
}
}

pub fn validate_static_schema(
body: &Bytes,
stream_name: &str,
time_partition: &str,
custom_partition: Option<&String>,
static_schema_flag: bool,
) -> Result<Arc<Schema>, CreateStreamError> {
if !static_schema_flag {
return Ok(Arc::new(Schema::empty()));
}

if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
let parsed_schema =
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
.map_err(|_| CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
})?;

Ok(parsed_schema)
}

pub fn validate_time_partition_limit(
time_partition_limit: &str,
) -> Result<NonZeroU32, CreateStreamError> {
Expand Down
227 changes: 88 additions & 139 deletions src/static_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,163 +16,112 @@
*
*/

use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::utils::arrow::get_field;
use anyhow::{anyhow, Error as AnyError};
use serde::{Deserialize, Serialize};
use std::str;
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema, TimeUnit};
use std::{collections::HashMap, sync::Arc};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StaticSchema {
fields: Vec<SchemaFields>,
}
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaFields {
name: String,
data_type: String,
use crate::event::DEFAULT_TIMESTAMP_KEY;

const DEFAULT_NULLABLE: bool = true;

type FieldName = String;
type FieldType = String;

#[derive(Debug, thiserror::Error)]
pub enum StaticSchemaError {
#[error(
"custom partition field {0} does not exist in the schema for the static schema logstream"
)]
MissingCustom(String),
#[error(
"time partition field {0} does not exist in the schema for the static schema logstream"
)]
MissingTime(String),
#[error("field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field")]
DefaultTime,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParsedSchema {
pub fields: Vec<Fields>,
pub metadata: HashMap<String, String>,
struct SchemaFields {
name: FieldName,
data_type: FieldType,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Fields {
name: String,
data_type: DataType,
nullable: bool,
dict_id: i64,
dict_is_ordered: bool,
metadata: HashMap<String, String>,
pub struct StaticSchema {
fields: Vec<SchemaFields>,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]

pub struct Metadata {}
pub fn convert_static_schema_to_arrow_schema(
static_schema: StaticSchema,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, AnyError> {
let mut parsed_schema = ParsedSchema {
fields: Vec::new(),
metadata: HashMap::new(),
};
let mut time_partition_exists = false;

if let Some(custom_partition) = custom_partition {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len());

for partition in &custom_partition_list {
if static_schema
.fields
.iter()
.any(|field| &field.name == partition)
{
custom_partition_exists.insert(partition.to_string(), true);
impl StaticSchema {
pub fn convert_to_arrow_schema(
self,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, StaticSchemaError> {
let mut schema: Vec<Arc<Field>> = Vec::new();

// Convert to hashmap for easy access and operation
let mut fields: HashMap<FieldName, FieldType> = self
.fields
.into_iter()
.map(|field| (field.name, field.data_type))
.collect();

// Ensures all custom partitions are present in schema
if let Some(custom_partition) = custom_partition {
for partition in custom_partition.split(',') {
if !fields.contains_key(partition) {
return Err(StaticSchemaError::MissingCustom(partition.to_owned()));
}
}
}

for partition in &custom_partition_list {
if !custom_partition_exists.contains_key(*partition) {
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
}
// Ensures default timestamp is not mentioned(as it is only inserted by parseable)
if fields.contains_key(DEFAULT_TIMESTAMP_KEY) {
return Err(StaticSchemaError::DefaultTime);
}
}
for mut field in static_schema.fields {
if !time_partition.is_empty() && field.name == time_partition {
time_partition_exists = true;
field.data_type = "datetime".to_string();

// If time partitioning is enabled, mutate the datatype to be datetime
if !time_partition.is_empty() {
let Some(field_type) = fields.get_mut(time_partition) else {
return Err(StaticSchemaError::MissingTime(time_partition.to_owned()));
};
*field_type = "datetime".to_string();
}

let parsed_field = Fields {
name: field.name.clone(),

data_type: {
match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
}
"int_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
}
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
for (field_name, field_type) in fields {
let data_type = match field_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
"int_list" => DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
},
nullable: default_nullable(),
dict_id: default_dict_id(),
dict_is_ordered: default_dict_is_ordered(),
metadata: HashMap::new(),
};

parsed_schema.fields.push(parsed_field);
}
if !time_partition.is_empty() && !time_partition_exists {
return Err(anyhow! {
format!(
"time partition field {time_partition} does not exist in the schema for the static schema logstream"
),
});
}
add_parseable_fields_to_static_schema(parsed_schema)
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
};
let field = Field::new(&field_name, data_type, DEFAULT_NULLABLE);
schema.push(Arc::new(field));
}

fn add_parseable_fields_to_static_schema(
parsed_schema: ParsedSchema,
) -> Result<Arc<Schema>, AnyError> {
let mut schema: Vec<Arc<Field>> = Vec::new();
for field in parsed_schema.fields.iter() {
let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable);
schema.push(Arc::new(field));
// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
Ok(Arc::new(Schema::new(schema)))
}

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
));
};

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
let schema = Arc::new(Schema::new(schema));
Ok(schema)
}

fn default_nullable() -> bool {
true
}
fn default_dict_id() -> i64 {
0
}
fn default_dict_is_ordered() -> bool {
false
}
Loading