- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: simplification of static schema handling #1192
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThe changes integrate static schema handling directly into the stream creation process. In Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server
participant StaticSchema
Client->>Server: Send stream creation request
Server->>Server: Check static_schema_flag in create_update_stream
alt static_schema_flag is true
Server->>Server: Validate request body is not empty
Server->>StaticSchema: Deserialize body into StaticSchema
StaticSchema->>Server: Execute convert_to_arrow_schema
Server->>Client: Return Arrow schema or error response
else
Server->>Client: Process non-static schema request
end
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/static_schema.rs (1)
32-33
: Consider adding documentation to clarify the intended usage.It might be helpful to add Rust documentation comments (
///
) explaining the purpose and usage ofconvert_to_arrow_schema
. This will improve maintainability and ensure future contributors understand the function's constraints and requirements.src/parseable/mod.rs (1)
498-506
: Preserve the original error context when mapping errors.Using
.map_err(|_| ...)
discards potentially useful debugging information from the underlying failure. Consider capturing the original error in your mapped message to facilitate troubleshooting.- .map_err(|_| CreateStreamError::Custom { + .map_err(|err| CreateStreamError::Custom { msg: format!( "Unable to commit static schema, logstream {stream_name} not created" ), status: StatusCode::BAD_REQUEST, })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/parseable/mod.rs
(2 hunks)src/static_schema.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: coverage
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (8)
src/static_schema.rs (5)
41-47
: Validation of custom partition fields is well-handled.Your code correctly checks each requested partition name and errors out if it is absent in the schema. This clear early-failure mechanism helps maintain data integrity.
48-52
: Approach for converting the time partition to a datetime type looks reasonable.By assigning the
"datetime"
string to thedata_type
when the field name matches the time partition, you consistently enforce the time-aware format. Just ensure any dependent code in the pipeline recognizes this convention correctly.
88-90
: Good defensive check on the time partition field.Raising an error when
time_partition
is set but not actually present in the fields reduces confusion and ensures consistency in downstream logic.
98-103
: Reserved field check is highly appreciated.The explicit error for an existing
DEFAULT_TIMESTAMP_KEY
prevents accidental collisions with internal fields, ensuring data cleanliness and system predictability.
105-115
: Prepending the timestamp field at index 0 is clear and consistent.Inserting this field ensures that the server’s internal timestamp column is always recognized and properly handled, preserving the ordering convention.
src/parseable/mod.rs (3)
50-50
: No issues noted with the new import.
488-496
: Approach for verifying non-empty schema body whenstatic_schema_flag
is set is sound.Returning an immediate error if the request body is empty prevents incomplete or invalid static schemas from being created. This early check is appropriate.
507-509
: Fallback to an empty Arrow schema is acceptable.When the
static_schema_flag
is false, constructing an empty schema aligns with the design choice of a dynamic or yet-to-be-defined schema scenario.
src/static_schema.rs
Outdated
let parsed_field = Fields { | ||
name: field.name.clone(), | ||
|
||
data_type: { | ||
match field.data_type.as_str() { | ||
"int" => DataType::Int64, | ||
"double" | "float" => DataType::Float64, | ||
"boolean" => DataType::Boolean, | ||
"string" => DataType::Utf8, | ||
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), | ||
"string_list" => { | ||
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) | ||
} | ||
"int_list" => { | ||
DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) | ||
} | ||
"double_list" | "float_list" => { | ||
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) | ||
} | ||
"boolean_list" => { | ||
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) | ||
} | ||
_ => DataType::Null, | ||
} | ||
}, | ||
nullable: default_nullable(), | ||
dict_id: default_dict_id(), | ||
dict_is_ordered: default_dict_is_ordered(), | ||
metadata: HashMap::new(), | ||
}; | ||
|
||
fields.push(parsed_field); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Return an explicit error for unrecognized data types instead of defaulting to null.
Falling back to DataType::Null
for unknown types can silently mask typos or misunderstandings in the schema. Consider returning an error to highlight the invalid type rather than converting it to null.
- _ => DataType::Null,
+ other => {
+ return Err(anyhow!("Unsupported or unrecognized data type: {other}"));
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let parsed_field = Fields { | |
name: field.name.clone(), | |
data_type: { | |
match field.data_type.as_str() { | |
"int" => DataType::Int64, | |
"double" | "float" => DataType::Float64, | |
"boolean" => DataType::Boolean, | |
"string" => DataType::Utf8, | |
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), | |
"string_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) | |
} | |
"int_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) | |
} | |
"double_list" | "float_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) | |
} | |
"boolean_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) | |
} | |
_ => DataType::Null, | |
} | |
}, | |
nullable: default_nullable(), | |
dict_id: default_dict_id(), | |
dict_is_ordered: default_dict_is_ordered(), | |
metadata: HashMap::new(), | |
}; | |
fields.push(parsed_field); | |
} | |
let parsed_field = Fields { | |
name: field.name.clone(), | |
data_type: { | |
match field.data_type.as_str() { | |
"int" => DataType::Int64, | |
"double" | "float" => DataType::Float64, | |
"boolean" => DataType::Boolean, | |
"string" => DataType::Utf8, | |
"datetime" => DataType::Timestamp(TimeUnit::Millisecond, None), | |
"string_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))) | |
} | |
"int_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Int64, true))) | |
} | |
"double_list" | "float_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Float64, true))) | |
} | |
"boolean_list" => { | |
DataType::List(Arc::new(Field::new("item", DataType::Boolean, true))) | |
} | |
other => { | |
return Err(anyhow!("Unsupported or unrecognized data type: {other}")); | |
} | |
} | |
}, | |
nullable: default_nullable(), | |
dict_id: default_dict_id(), | |
dict_is_ordered: default_dict_is_ordered(), | |
metadata: HashMap::new(), | |
}; | |
fields.push(parsed_field); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/static_schema.rs (1)
73-88
:⚠️ Potential issueReturn an explicit error for unrecognized data types instead of defaulting to null.
Falling back to
DataType::Null
for unknown types can silently mask typos or misunderstandings in the schema.Consider returning an error to highlight the invalid type:
- _ => DataType::Null, + other => { + return Err(StaticSchemaError::InvalidType(other.to_owned())); + }This requires adding a new variant to
StaticSchemaError
:#[error("unsupported or unrecognized data type: {0}")] InvalidType(String),
🧹 Nitpick comments (2)
src/static_schema.rs (2)
48-52
: Add documentation for the schema conversion method.Consider adding documentation to explain:
- The purpose of the method
- Parameter descriptions
- Return value details
- Examples of usage
Add this documentation above the method:
+/// Converts a static schema to an Arrow schema. +/// +/// # Arguments +/// * `time_partition` - The name of the time partition field +/// * `custom_partition` - Optional comma-separated list of custom partition fields +/// +/// # Returns +/// An Arrow schema wrapped in an Arc, or a StaticSchemaError if validation fails +/// +/// # Errors +/// Returns an error if: +/// - Custom partition fields don't exist in the schema +/// - Time partition field doesn't exist in the schema +/// - Schema contains the reserved timestamp field pub fn convert_to_arrow_schema(
112-116
: Add documentation for the SchemaFields struct.Consider adding documentation to explain the purpose and usage of this struct.
Add this documentation above the struct:
+/// Represents a field in a static schema. +/// +/// # Fields +/// * `name` - The name of the field +/// * `data_type` - The data type of the field as a string #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct SchemaFields {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/static_schema.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/static_schema.rs (2)
26-31
: LGTM! Clean and focused struct definition.The simplified
StaticSchema
struct with a single field aligns well with the PR's objective of simplifying static schema handling.
33-45
: LGTM! Well-structured error handling.The
StaticSchemaError
enum provides clear, specific error types with descriptive messages, making it easier to debug and handle schema-related issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/static_schema.rs (1)
94-109
:⚠️ Potential issueReturn an error for unrecognized data types instead of defaulting to null.
Defaulting to
DataType::Null
for unknown types can silently mask schema errors.Apply this fix:
- _ => DataType::Null, + other => { + return Err(StaticSchemaError::InvalidType(field_name.clone(), other.to_string())); + }Add the new error variant:
#[error("unsupported data type '{1}' for field '{0}'")] InvalidType(String, String),
🧹 Nitpick comments (2)
src/static_schema.rs (2)
28-30
: Add documentation for public types and type aliases.Consider adding documentation comments to improve code maintainability:
- Document the
FieldName
andFieldType
type aliases- Add documentation for the
SchemaFields
andStaticSchema
structsAdd documentation like this:
+/// Represents the name of a field in the schema type FieldName = String; +/// Represents the data type of a field in the schema type FieldType = String; +/// Represents a field in the schema with its name and data type #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] struct SchemaFields { name: FieldName, data_type: FieldType, } +/// Represents a static schema containing a collection of fields #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { fields: Vec<SchemaFields>, }Also applies to: 45-54
100-107
: Make list item field names more descriptive.The hardcoded "item" field name in list types could be more descriptive.
Consider making the item field name match the list type:
- "string_list" => DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), - "int_list" => DataType::List(Arc::new(Field::new("item", DataType::Int64, true))), + "string_list" => DataType::List(Arc::new(Field::new("string_value", DataType::Utf8, true))), + "int_list" => DataType::List(Arc::new(Field::new("integer_value", DataType::Int64, true))),
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/static_schema.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (1)
src/static_schema.rs (1)
31-43
: LGTM! Well-structured error handling.The error handling implementation is clean and follows best practices:
- Specific error variants for each failure case
- Clear and descriptive error messages
- Proper use of thiserror
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/parseable/mod.rs (2)
465-470
: Potential redundant validation check.This validation check seems to be duplicate of the one at lines 633-638 in the
validate_and_update_custom_partition
method. However, since this check is applied at stream creation time, while the other check is for updates, both might be needed to ensure consistency. Consider adding a comment explaining the validation's purpose at stream creation time.
472-493
: Code simplification looks good.The schema handling logic has been appropriately integrated into the
create_update_stream
method, replacing the separatevalidate_static_schema
function. This simplifies the code flow while maintaining clear error messages with appropriate context.One suggestion: Consider adding a brief comment explaining the static schema conversion process since it's a core part of the refactoring.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/parseable/mod.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/parseable/mod.rs (2)
50-50
: Import statement updated correctly.The import has been appropriately updated to just import the
StaticSchema
struct which matches the new refactored approach.
482-485
:❓ Verification inconclusive
Verify compatibility with large schema payloads.
The schema deserialization and conversion now happens directly in the API request handling path. Ensure this process is efficient enough for large schema payloads to avoid potential request timeouts.
🏁 Script executed:
#!/bin/bash # Find usages of convert_to_arrow_schema to understand its complexity # and assess if it might be expensive for large schemas echo "Searching for convert_to_arrow_schema implementation and usages..." rg -A 10 "convert_to_arrow_schema" # Check for any schema size limits or handling in the codebase echo "Searching for any schema size limits or handling..." rg -i "schema.*(size|limit)"Length of output: 1748
Action Required: Validate Performance for Large Schema Payloads
The conversion process in
src/static_schema.rs::convert_to_arrow_schema
is directly invoked in the API request path without apparent safeguards or predefined limits on schema size. While the implementation iterates over individual schema fields, which may be acceptable for typical payloads, it's important to verify that the routine scales efficiently when handling unusually large schemas. I recommend:
- Benchmarking the conversion process with large schema payloads to observe its impact on request latency.
- Considering the implementation of size limits or optimizations if performance degradation is detected under heavy load.
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Refactor