-
-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: replace anyhow with custom StaticSchemaError #1208
Conversation
WalkthroughThis pull request enhances error handling in the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Converter as convert_static_schema_to_arrow_schema
participant Validator as validate_field_names
participant Parser as add_parseable_fields_to_static_schema
Client->>Converter: Request schema conversion
Converter->>Validator: Validate each field name
Validator-->>Converter: Return OK/error (StaticSchemaError)
Converter->>Parser: Process parseable fields
Parser-->>Converter: Return schema/result (StaticSchemaError)
Converter-->>Client: Return final schema or error
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🔇 Additional comments (11)
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/static_schema.rs (2)
178-189
: Consider adding field name format validation.While the current validation handles empty and duplicate fields well, consider adding validation for field name format to prevent issues with special characters or spaces that might cause problems in downstream processing.
Example validation to consider:
fn validate_field_names(field_name: &str, existing_fields: &mut HashSet<String>) -> Result<(), StaticSchemaError> { if field_name.is_empty() { return Err(StaticSchemaError::EmptyFieldName); } + // Validate field name format (alphanumeric and underscore only) + if !field_name.chars().all(|c| c.is_alphanumeric() || c == '_') { + return Err(StaticSchemaError::InvalidFieldNameFormat(field_name.to_string())); + } + if !existing_fields.insert(field_name.to_string()) { return Err(StaticSchemaError::DuplicateField(field_name.to_string())); } Ok(()) }
215-233
: Add tests for partition and default time field validation.While the current tests cover field name validation, consider adding tests for:
- Custom partition validation
- Time partition validation
- Default time field validation
Example additional tests:
#[test] fn test_missing_custom_partition() { let schema = StaticSchema { fields: vec![SchemaFields { name: "field1".to_string(), data_type: "string".to_string(), }], }; let custom_partition = "custom_field".to_string(); assert!(matches!( convert_static_schema_to_arrow_schema(schema, "", Some(&custom_partition)), Err(StaticSchemaError::MissingCustomPartition(_)) )); } #[test] fn test_missing_time_partition() { let schema = StaticSchema { fields: vec![SchemaFields { name: "field1".to_string(), data_type: "string".to_string(), }], }; assert!(matches!( convert_static_schema_to_arrow_schema(schema, "time_field", None), Err(StaticSchemaError::MissingTimePartition(_)) )); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/static_schema.rs
(5 hunks)
🔇 Additional comments (4)
src/static_schema.rs (4)
25-25
: LGTM: Appropriate import for field name tracking.The addition of
HashSet
to track unique field names is a good choice for efficient duplicate detection.
62-62
: LGTM: Improved error handling specificity.The change from generic
AnyError
to specificStaticSchemaError
enhances error handling clarity and type safety.Also applies to: 141-141
85-85
: LGTM: Clear and specific error messages.The error messages are descriptive and provide actionable information about missing partitions and reserved fields.
Also applies to: 134-134, 150-150
192-213
: LGTM: Well-structured error enum with clear messages.The
StaticSchemaError
enum effectively categorizes different error cases with descriptive messages. Good use of thiserror for deriving the Error trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but can you restrict import for HashSet
to the test mod and run cargo fmt to ensure stray whitespaces are pruned?
dff2fb4
to
bd08e05
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/static_schema.rs (1)
217-233
: Good test coverage for new validation logic.The tests appropriately verify both validation rules (empty field names and duplicate field names).
Consider adding tests for the other error variants as well to ensure complete coverage of the error handling logic.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/static_schema.rs
(5 hunks)
🔇 Additional comments (8)
src/static_schema.rs (8)
25-28
: Clean organization of imports.The imports are well-organized with proper grouping of related types. Adding HashSet alongside HashMap makes sense for the field name validation feature.
65-65
: Good transition to specific error type.Changing the return type from a generic error to a specific
StaticSchemaError
improves error handling clarity and provides better context for callers.
88-88
: Improved error reporting with specific error variant.Replacing generic anyhow errors with the specific
MissingCustomPartition
variant provides better error context and aligns with the PR objective of enhancing error management.
93-97
: Effective implementation of field name validation.Using a HashSet to track existing field names is efficient (O(1) lookups) and correctly validates fields early in the processing flow.
136-136
: Consistent error handling with specific variant.The
MissingTimePartition
error variant provides clearer feedback compared to a generic error message.
143-143
: Consistent error handling throughout the module.The function signature changes maintain consistency with the rest of the module, and replacing anyhow with
DefaultTime
error variant follows the same pattern of improved error specificity.Also applies to: 152-152
180-191
: Well-implemented field name validation.The validation function is clean, focused, and efficient. It correctly checks for both empty field names and duplicates, returning appropriate error variants in each case.
194-215
: Well-structured error enum with descriptive messages.The
StaticSchemaError
enum is well-designed with specific variants for each error case. Using thiserror for deriving the Error trait is a good practice, and the error messages are clear and informative.
src/static_schema.rs
Outdated
"time partition field {time_partition} does not exist in the schema for the static schema logstream" | ||
), | ||
}); | ||
return Err(StaticSchemaError::MissingTimePartition(time_partition.to_string())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Err(StaticSchemaError::MissingTimePartition(time_partition.to_string())); | |
return Err(StaticSchemaError::MissingTimePartition( | |
time_partition.to_string(), | |
)); |
src/static_schema.rs
Outdated
@@ -83,11 +85,15 @@ pub fn convert_static_schema_to_arrow_schema( | |||
|
|||
for partition in &custom_partition_list { | |||
if !custom_partition_exists.contains_key(*partition) { | |||
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); | |||
return Err(StaticSchemaError::MissingCustomPartition(partition.to_string())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Err(StaticSchemaError::MissingCustomPartition(partition.to_string())); | |
return Err(StaticSchemaError::MissingCustomPartition( | |
partition.to_string(), | |
)); |
src/static_schema.rs
Outdated
} | ||
add_parseable_fields_to_static_schema(parsed_schema) | ||
} | ||
|
||
fn add_parseable_fields_to_static_schema( | ||
parsed_schema: ParsedSchema, | ||
) -> Result<Arc<Schema>, AnyError> { | ||
) -> Result<Arc<Schema>, StaticSchemaError> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/static_schema.rs
Outdated
fn validate_field_names(field_name: &str, existing_fields: &mut HashSet<String>) -> Result<(), StaticSchemaError> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn validate_field_names(field_name: &str, existing_fields: &mut HashSet<String>) -> Result<(), StaticSchemaError> { | |
fn validate_field_names( | |
field_name: &str, | |
existing_fields: &mut HashSet<String>, | |
) -> Result<(), StaticSchemaError> { |
src/static_schema.rs
Outdated
|
||
|
||
#[derive(Debug, thiserror::Error)] | ||
pub enum StaticSchemaError{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[derive(Debug, thiserror::Error)] | |
pub enum StaticSchemaError{ | |
#[derive(Debug, thiserror::Error)] | |
pub enum StaticSchemaError { |
src/static_schema.rs
Outdated
MissingTimePartition(String), | ||
|
||
#[error("field {DEFAULT_TIMESTAMP_KEY:?} is a reserved field")] | ||
DefaultTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DefaultTime, | |
ReservedKey(&'static str), |
This should make more sense no?
src/static_schema.rs
Outdated
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use std::collections::HashSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use std::collections::HashSet; |
I am sorry, I didn't understand why there was an import previously
2f0b686
to
d61138f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (13)
src/handlers/http/health_check.rs (3)
67-67
: Consider confirming flush_and_convert successes.
flush_and_convert
likely spawns tasks that join later. If the method returns additional information or errors, consider handling them here or logging them.
69-75
: Partial failure handling might be enhanced.
The loop logs failures but continues. If complete reliability is critical, consider a strategy to handle or roll back partial failures.
77-86
: Add or clarify retries on sync failure.
Currently, a warning is logged on sync failure, but the process proceeds. Consider adding a retry mechanism or a clear recovery path if remote upload is essential.src/utils/arrow/mod.rs (1)
199-205
: Beware of potential race conditions in time-based assertions.
Comparing the array's timestamp withnow.timestamp_millis()
might intermittently fail on slower systems when the test’s execution slightly delays the array creation. Consider adding a small tolerance or verifying exact values if deterministic.src/sync.rs (4)
89-93
: Error handling on sync handler joins.
Here, anyJoinError
is simply logged. If you need stronger guarantees (e.g., restart tasks), consider a more robust fallback.
104-104
: Remote sync handler reinitialization.
The code logs the error and restarts the thread. Ensure logs are sufficient to debug repeated crashes if needed.
191-202
: Robustly logs success or failure per flush+conversion task.
The approach is consistent with the new concurrency architecture. Optionally, consider collecting and reporting aggregated results if partial failures matter.
219-219
: Panic handling logs the error and cleans up.
This is appropriate for catastrophic failures. If there's a chance for partial recovery, consider a separate fallback if beneficial.src/parseable/streams.rs (3)
203-219
: Fallback considerations for timestamps
The function relies on file creation times and uses.expect(..)
, which may panic on platforms lacking reliable creation metadata.Consider gracefully handling this scenario and possibly falling back to modified time or skipping such files entirely if creation metadata is unavailable.
456-472
: Metrics collection for total arrow files and sizes
Unwrapping metadata can lead to potential panics if files are removed or inaccessible mid-operation.Use safer error handling or skip unreadable files to avoid unexpected panics.
168-168
: File path format string
Embedding the hostname directly after the dot might obscure readability.Consider using a clearer delimiter (e.g.,
.{hostname}
or-
).src/static_schema.rs (2)
183-196
: New function validate_field_names
The implementation excludes empty names and duplicates. Consider clarifying if whitespace-only is recognized as empty.fn validate_field_names( field_name: &str, existing_fields: &mut HashSet<String>, ) -> Result<(), StaticSchemaError> { + // For example, you could trim whitespace before checking: + let trimmed = field_name.trim(); + if trimmed.is_empty() { + return Err(StaticSchemaError::EmptyFieldName); + } ... }
220-236
: Unit tests for validation logic
Good coverage for both empty and duplicate field cases.Consider adding a test to verify behavior when the field name contains only whitespace or special characters.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
src/connectors/kafka/processor.rs
(1 hunks)src/event/format/mod.rs
(3 hunks)src/handlers/http/health_check.rs
(2 hunks)src/handlers/http/ingest.rs
(14 hunks)src/handlers/http/modal/mod.rs
(0 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(4 hunks)src/lib.rs
(1 hunks)src/parseable/mod.rs
(3 hunks)src/parseable/streams.rs
(15 hunks)src/query/listing_table_builder.rs
(1 hunks)src/static_schema.rs
(5 hunks)src/storage/mod.rs
(0 hunks)src/storage/object_storage.rs
(2 hunks)src/sync.rs
(8 hunks)src/utils/arrow/mod.rs
(3 hunks)
💤 Files with no reviewable changes (2)
- src/handlers/http/modal/mod.rs
- src/storage/mod.rs
🔇 Additional comments (47)
src/connectors/kafka/processor.rs (1)
69-75
: Timestamp handling enhancement.The addition of
Utc::now()
as a parameter tointo_recordbatch
ensures current timestamp information is correctly passed to the record batch creation process, improving time-sensitive operations handling across the codebase.src/storage/object_storage.rs (3)
39-40
: Updated import organization for logging levels.The change properly updates import statements to reflect the change in logging level usage from debug to info for synchronization operations.
716-716
: Code simplification for directory existence check.The directory existence check has been simplified by directly calling
exists()
on the staging directory path, eliminating an unnecessary intermediate step.
722-722
: Improved logging visibility for synchronization operations.Changing from
debug!
toinfo!
increases the visibility of stream synchronization operations in logs, which is appropriate for this important operational event.src/query/listing_table_builder.rs (1)
34-37
: Improved import organization.The import statements have been consolidated into a cleaner format without changing functionality, improving code readability.
src/lib.rs (1)
62-70
: Enhanced time interval constants with Duration type.The changes improve type safety and clarity by:
- Converting time values from raw integers to Duration types
- Adding clear documentation for each constant's purpose
- Establishing a consistent relationship between constants where
OBJECT_STORE_DATA_GRANULARITY
is derived fromLOCAL_SYNC_INTERVAL
This refactoring provides better semantics and reduces the risk of misinterpreting time values.
src/handlers/http/health_check.rs (2)
30-31
: New concurrency and logging imports look appropriate.
These additions properly support asynchronous task management and structured logging without introducing any apparent conflicts.
64-65
: JoinSet creation is a good concurrency approach.
Initializing a dedicatedJoinSet
makes the stream-flushing tasks more manageable. Just ensure that any lingering tasks won't outlive the shutdown, or if they do, that it is by design.src/utils/arrow/mod.rs (3)
48-48
: Chrono import is correct for date/time management.
No issues found. This is necessary to handleDateTime<Utc>
references in the updated code.
136-137
: Updated function signature improves clarity and testability.
Accepting an externalDateTime<Utc>
instead of generating it internally is a solid enhancement for reproducibility and unit testing.
211-211
: Zero-size timestamp array test is valid.
Verifying boundary conditions for an empty array is a good practice. No concerns here.src/sync.rs (8)
23-23
: JoinSet import aligns with concurrency refactoring.
No concerns; this library usage is consistent with the pattern seen elsewhere.
30-30
: Centralizing intervals is beneficial.
ReferencingLOCAL_SYNC_INTERVAL
andSTORAGE_UPLOAD_INTERVAL
in one place promotes maintainability.
67-67
: Accurate duration logging.
Usingstart_time.elapsed()
clarifies how long a task took post-threshold warning. This enhances observability.
75-75
: Switched to multi-thread runtime.
This is ideal if tasks are CPU-intensive or numerous. Verify that the number of worker threads is sufficient for your throughput needs.
79-80
: Local and remote sync concurrency.
Spawning both local and remote sync tasks allows them to operate independently. Confirm that both tasks handle partial failures and final states consistently.
122-122
: Regular object store sync interval.
Employinginterval_at(next_minute(), STORAGE_UPLOAD_INTERVAL)
is a predictable approach. Just verify that large uploads complete before the next tick, or consider a check for in-progress tasks.
172-173
: Documentation update for local_sync.
The docstring accurately represents that the function flushes and converts data to parquet. Good for clarity.
186-187
: Local sync scheduling with JoinSet.
Each flush-and-convert triggers concurrent tasks. Might be worth confirming the maximum concurrency impact (e.g., memory usage with too many tasks).src/event/format/mod.rs (2)
111-111
: Parameter addition enhances timestamp controlAdding the
p_timestamp
parameter to theinto_recordbatch
method allows for more consistent timestamp handling across the codebase, enabling better testing and reproducibility.
149-149
: Updated implementation uses externally-provided timestampThis change correctly passes the provided timestamp to the
get_timestamp_array
function instead of generating a new timestamp internally, ensuring timestamp consistency.src/parseable/mod.rs (2)
461-464
: Improved error handling for conflicting partitionsConsolidating the error handling for when both time partition and custom partition are set improves code clarity and user experience.
613-618
: Added validation to prevent conflicting partition configurationsThis validation ensures that time partition and custom partition cannot be set simultaneously, which is an important business rule check that prevents invalid configurations.
src/handlers/http/modal/utils/ingest_utils.rs (5)
99-99
: Centralizing timestamp creation improves consistencyCreating a single timestamp variable at the beginning of the function ensures consistent timestamps throughout the request processing lifecycle.
125-125
: Using stored timestamp for consistent behaviorThis change ensures that the same timestamp is used when no time partition is specified, improving consistency in the data.
148-148
: Passing timestamp through function chainThis properly passes the timestamp to the
into_event_batch
function, maintaining consistency with the changes to the function signature.
173-173
: Updated function signature to support timestamp parameterThe
into_event_batch
function now accepts a timestamp parameter, which aligns with the changes in theEventFormat
trait and ensures consistent timestamp handling.
180-180
: Forwarding timestamp to into_recordbatchThis correctly passes the timestamp to the underlying
into_recordbatch
method, completing the chain of timestamp propagation.src/handlers/http/ingest.rs (4)
82-82
: Renaming and creating timestamp variable for clarityReplacing
parsed_timestamp
withnow
and capturing the current UTC time creates a clearer understanding of the timestamp's purpose.
96-96
: Passing timestamp to into_recordbatch methodThis correctly passes the timestamp to the function, which is consistent with the updated method signature.
104-104
: Using consistent timestamp for Event creationUsing the same timestamp for the
parsed_timestamp
field ensures that all timestamp references within the request lifecycle are consistent.
399-399
: Updated all test cases to include timestamp parameterAll test cases have been properly updated to include the timestamp parameter, ensuring that tests continue to work correctly with the new function signature.
Also applies to: 433-433, 468-468, 500-500, 615-615, 667-667, 715-715, 757-757, 846-846
src/parseable/streams.rs (10)
27-27
: Leverage of SystemTime for file timestamps
No major concerns. The standard library’sSystemTime
along withUNIX_EPOCH
is a suitable choice for time-based operations.
33-33
: Use of chrono for date/time operations
Thechrono
imports here seem appropriate for extracting time components (hours, minutes, etc.).
45-45
: Async concurrency with JoinSet
Importingtokio::task::JoinSet
is a clean approach for managing async tasks.
54-54
: Referencing storage objects
Bringing into_bytes
,Retention
, andStreamType
is consistent with the updated logic for reading/writing data.
442-442
: Switching to SystemTime acquisition
UsingSystemTime::now()
aligns well withminute_from_system_time
. This provides consistent, OS-level timestamps.
673-674
: Convenience method for flush and convert
This succinctly flushes to disk then delegates toprepare_parquet
. The flow looks clear and maintainable.
744-750
: Asynchronous flush_and_convert for all streams
Spawning the conversion tasks per stream is efficient. However, if a single stream encounters an error, the handling of partial failures may warrant further consideration.Would you like additional checks or a concurrency test suite to verify robust error handling when multiple streams flush in parallel?
Also applies to: 758-758
769-769
: chrono usage in tests
These imports for test-time manipulations are standard and acceptable.
886-886
: Test path consistency
Ensuring the test’s path format matches production is good for validation.
920-920
: Test path format with custom partition
No issues here—this string interpolation correctly includes the partitioning scheme.src/static_schema.rs (5)
25-28
: Introducing HashSet for field checking
No concerns—this is an appropriate data structure for uniqueness checks.
65-65
: Return type changed to StaticSchemaError
Shifting from a generic error to a custom error type clarifies error semantics.
95-98
: Validating field names before processing
This check ensures correctness by preventing empty or duplicate field names from proceeding.
138-140
: Custom error for missing time partition
The dedicatedMissingTimePartition
error variant adequately highlights the missing field scenario.
198-218
: Definition of StaticSchemaError
Well-structured use ofEnum
with comprehensive variants for static schema issues.
src/parseable/mod.rs
Outdated
if custom_partition_list.len() > 1 { | ||
return Err(CreateStreamError::Custom { | ||
msg: "Maximum 3 custom partition keys are supported".to_string(), | ||
msg: "Maximum 1 custom partition key is supported".to_string(), | ||
status: StatusCode::BAD_REQUEST, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Verification inconclusive
Breaking change: reduced maximum custom partition keys
The maximum number of custom partition keys has been reduced from 3 to 1. This is a breaking change that could affect existing users who are using multiple custom partition keys.
🏁 Script executed:
#!/bin/bash
# Find examples of code using multiple custom partition keys
rg -A 5 -B 5 "custom_partition.*split|split.*custom_partition" --glob "*.rs" | grep -v "len() > 1"
Length of output: 5563
Attention: Breaking Change Impact Detected
The change in src/parseable/mod.rs
now enforces that only one custom partition key is allowed. The validator returns an error if custom_partition.split(',')
yields more than one key, reducing the maximum from 3 to 1. This adjustment is intentionally breaking and may impact any users who rely on using multiple custom partition keys. Please review all related usages (e.g., in ingest_utils.rs
, flatten.rs
, and object_storage.rs
) to ensure that any external or internal callers are aware of this limitation.
d61138f
to
dc2a39f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently, please check "Code review limits" under "Moderation" settings.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/parseable/mod.rs (1)
819-822
:⚠️ Potential issueBreaking change: reduced maximum custom partition keys
The maximum number of custom partition keys has been reduced from 3 to 1. This is a breaking change that could affect existing users who are using multiple custom partition keys.
Ensure that any external or internal callers are aware of this limitation, and verify that this change is properly documented in the release notes.
🛑 Comments failed to post (1)
src/parseable/streams.rs (1)
744-759: 🛠️ Refactor suggestion
Improved asynchronous processing with JoinSet
The function now spawns asynchronous tasks for each stream's conversion process, leveraging Tokio's JoinSet for better task management and parallelism.
Consider adding error handling for the spawned tasks:
pub fn flush_and_convert( &self, joinset: &mut JoinSet<Result<(), StagingError>>, shutdown_signal: bool, ) { let streams: Vec<Arc<Stream>> = self .read() .expect(LOCK_EXPECT) .values() .map(Arc::clone) .collect(); for stream in streams { - joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) }); + joinset.spawn(async move { + match stream.flush_and_convert(shutdown_signal) { + Ok(_) => Ok(()), + Err(e) => { + error!("Error flushing and converting stream {}: {:?}", stream.stream_name, e); + Err(e) + } + } + }); } }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements./// Asynchronously flushes arrows and compacts into parquet data on all streams in staging, /// so that it is ready to be pushed onto objectstore. pub fn flush_and_convert( &self, joinset: &mut JoinSet<Result<(), StagingError>>, shutdown_signal: bool, ) { let streams: Vec<Arc<Stream>> = self .read() .expect(LOCK_EXPECT) .values() .map(Arc::clone) .collect(); for stream in streams { joinset.spawn(async move { match stream.flush_and_convert(shutdown_signal) { Ok(_) => Ok(()), Err(e) => { error!("Error flushing and converting stream {}: {:?}", stream.stream_name, e); Err(e) } } }); } }
dc2a39f
to
72f0ad6
Compare
710a4e9
to
756be55
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/static_schema.rs (1)
222-237
: Good test coverage for validationTests for both empty field names and duplicate field names ensure the validation function works as expected.
Consider adding a positive test case that confirms valid field names pass validation.
#[cfg(test)] mod tests { use super::*; #[test] fn empty_field_names() { let mut existing_field_names: HashSet<String> = HashSet::new(); assert!(validate_field_names("", &mut existing_field_names).is_err()); } #[test] fn duplicate_field_names() { let mut existing_field_names: HashSet<String> = HashSet::new(); let _ = validate_field_names("test_field", &mut existing_field_names); assert!(validate_field_names("test_field", &mut existing_field_names).is_err()); } + + #[test] + fn valid_field_names() { + let mut existing_field_names: HashSet<String> = HashSet::new(); + assert!(validate_field_names("field1", &mut existing_field_names).is_ok()); + assert!(validate_field_names("field2", &mut existing_field_names).is_ok()); + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/static_schema.rs
(5 hunks)
🔇 Additional comments (10)
src/static_schema.rs (10)
25-28
: Good organization of importsThe structured import for standard library components is clean and well-organized.
65-65
: Improved error type specificityChanging the return type from a generic error to the specific
StaticSchemaError
improves type safety and error handling clarity.
88-90
: Better error reporting for missing custom partitionUsing a specific error variant instead of
anyhow!
provides clearer error information.
94-95
: Good validation approach using HashSetUsing a HashSet for tracking field names is an efficient way to check for duplicates.
98-98
: Proper validation integrationThe validation function is well-integrated into the schema processing flow.
138-140
: Enhanced error handling for missing time partitionThe specific error variant provides clearer context about the validation failure.
147-147
: Consistent error type usageUpdating the return type to match
StaticSchemaError
maintains consistency throughout the module.
155-157
: Improved reserved key error handlingThe error now provides explicit information about which key is reserved.
185-198
: Well-implemented field name validationThe validation function is concise and effectively checks for both empty field names and duplicates.
200-220
: Comprehensive error enum with helpful messagesThe
StaticSchemaError
enum is well-structured with descriptive error messages for each variant. Using thethiserror
crate simplifies error handling implementation.
756be55
to
4e233af
Compare
@7h3cyb3rm0nk please sign the CLA |
4e233af
to
ec9827e
Compare
I have read the CLA Document and I hereby sign the CLA |
Fixes #1197
This PR introduces two important changes
implementation Details
StaticSchemaError
using thiserror and specific error variantsMissingCustomPartition
MissingTimePartition
ReservedKey
EmptyFieldName
DuplicateField
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes