Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplification of static schema handling #1192

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
62 changes: 23 additions & 39 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::{
},
metadata::{LogStreamMetadata, SchemaVersion},
option::Mode,
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
static_schema::StaticSchema,
storage::{
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
ObjectStoreFormat, Owner, Permisssion, StreamType,
Expand Down Expand Up @@ -485,13 +485,28 @@ impl Parseable {
}
}

let schema = validate_static_schema(
body,
stream_name,
&time_partition,
custom_partition.as_ref(),
static_schema_flag,
)?;
let schema = if static_schema_flag {
if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
}.into());
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
static_schema
.convert_to_arrow_schema(&time_partition, custom_partition.as_ref())
.map_err(|err| CreateStreamError::Custom {
msg: format!(
"Unable to commit static schema, logstream {stream_name} not created; Error: {err}"
),
status: StatusCode::BAD_REQUEST,
})?
} else {
Arc::new(Schema::empty())
};

self.create_stream(
stream_name.to_string(),
Expand Down Expand Up @@ -778,37 +793,6 @@ impl Parseable {
}
}

pub fn validate_static_schema(
body: &Bytes,
stream_name: &str,
time_partition: &str,
custom_partition: Option<&String>,
static_schema_flag: bool,
) -> Result<Arc<Schema>, CreateStreamError> {
if !static_schema_flag {
return Ok(Arc::new(Schema::empty()));
}

if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}

let static_schema: StaticSchema = serde_json::from_slice(body)?;
let parsed_schema =
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
.map_err(|_| CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
})?;

Ok(parsed_schema)
}

pub fn validate_time_partition_limit(
time_partition_limit: &str,
) -> Result<NonZeroU32, CreateStreamError> {
Expand Down
216 changes: 77 additions & 139 deletions src/static_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,163 +16,101 @@
*
*/

use crate::event::DEFAULT_TIMESTAMP_KEY;
use crate::utils::arrow::get_field;
use anyhow::{anyhow, Error as AnyError};
use serde::{Deserialize, Serialize};
use std::str;
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema, TimeUnit};
use std::{collections::HashMap, sync::Arc};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct StaticSchema {
fields: Vec<SchemaFields>,
}
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaFields {
name: String,
data_type: String,
}
use crate::event::DEFAULT_TIMESTAMP_KEY;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParsedSchema {
pub fields: Vec<Fields>,
pub metadata: HashMap<String, String>,
}
const DEFAULT_NULLABLE: bool = true;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Fields {
name: String,
data_type: DataType,
nullable: bool,
dict_id: i64,
dict_is_ordered: bool,
metadata: HashMap<String, String>,
pub struct StaticSchema {
fields: Vec<SchemaFields>,
}

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]

pub struct Metadata {}
pub fn convert_static_schema_to_arrow_schema(
static_schema: StaticSchema,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, AnyError> {
let mut parsed_schema = ParsedSchema {
fields: Vec::new(),
metadata: HashMap::new(),
};
let mut time_partition_exists = false;

if let Some(custom_partition) = custom_partition {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len());
#[derive(Debug, thiserror::Error)]
pub enum StaticSchemaError {
#[error(
"custom partition field {0} does not exist in the schema for the static schema logstream"
)]
MissingCustom(String),
#[error(
"time partition field {0} does not exist in the schema for the static schema logstream"
)]
MissingTime(String),
#[error("field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field")]
DefaultTime,
}

for partition in &custom_partition_list {
if static_schema
.fields
.iter()
.any(|field| &field.name == partition)
{
custom_partition_exists.insert(partition.to_string(), true);
impl StaticSchema {
pub fn convert_to_arrow_schema(
self,
time_partition: &str,
custom_partition: Option<&String>,
) -> Result<Arc<Schema>, StaticSchemaError> {
let mut schema: Vec<Arc<Field>> = Vec::new();
let mut time_partition_exists = false;

if let Some(custom_partition) = custom_partition {
for partition in custom_partition.split(',') {
if !self.fields.iter().any(|field| field.name == partition) {
return Err(StaticSchemaError::MissingCustom(partition.to_owned()));
}
}
}

for partition in &custom_partition_list {
if !custom_partition_exists.contains_key(*partition) {
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
for mut field in self.fields {
if field.name == DEFAULT_TIMESTAMP_KEY {
return Err(StaticSchemaError::DefaultTime);
}
}
}
for mut field in static_schema.fields {
if !time_partition.is_empty() && field.name == time_partition {
time_partition_exists = true;
field.data_type = "datetime".to_string();
}

let parsed_field = Fields {
name: field.name.clone(),
if !time_partition.is_empty() && field.name == time_partition {
time_partition_exists = true;
field.data_type = "datetime".to_string();
}

data_type: {
match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true)))
}
"int_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Int64, true)))
}
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
let data_type = match field.data_type.as_str() {
"int" => DataType::Int64,
"double" | "float" => DataType::Float64,
"boolean" => DataType::Boolean,
"string" => DataType::Utf8,
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None),
"string_list" => DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
"int_list" => DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
"double_list" | "float_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Float64, true)))
}
},
nullable: default_nullable(),
dict_id: default_dict_id(),
dict_is_ordered: default_dict_is_ordered(),
metadata: HashMap::new(),
};
"boolean_list" => {
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true)))
}
_ => DataType::Null,
};
let field = Field::new(&field.name, data_type, DEFAULT_NULLABLE);
schema.push(Arc::new(field));
}

parsed_schema.fields.push(parsed_field);
}
if !time_partition.is_empty() && !time_partition_exists {
return Err(anyhow! {
format!(
"time partition field {time_partition} does not exist in the schema for the static schema logstream"
),
});
}
add_parseable_fields_to_static_schema(parsed_schema)
}
if !time_partition.is_empty() && !time_partition_exists {
return Err(StaticSchemaError::MissingTime(time_partition.to_owned()));
}

fn add_parseable_fields_to_static_schema(
parsed_schema: ParsedSchema,
) -> Result<Arc<Schema>, AnyError> {
let mut schema: Vec<Arc<Field>> = Vec::new();
for field in parsed_schema.fields.iter() {
let field = Field::new(field.name.clone(), field.data_type.clone(), field.nullable);
schema.push(Arc::new(field));
// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
Ok(Arc::new(Schema::new(schema)))
}

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
));
};

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
let schema = Arc::new(Schema::new(schema));
Ok(schema)
}

fn default_nullable() -> bool {
true
}
fn default_dict_id() -> i64 {
0
}
fn default_dict_is_ordered() -> bool {
false
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SchemaFields {
name: String,
data_type: String,
}
Loading