Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: event handling in parseable #1218

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Mar 1, 2025

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Introduced enhanced log ingestion endpoints that now support payloads with size metadata.
    • Added dynamic schema management for more reliable log processing and event partitioning.
  • Refactor

    • Streamlined event processing to improve the handling of streaming logs and metrics.
    • Consolidated JSON handling to better support nested log structures.
  • Chore

    • Removed obsolete modules and redundant components to improve maintainability and overall system efficiency.

Copy link

coderabbitai bot commented Mar 1, 2025

Walkthrough

This pull request introduces several refactors and improvements across multiple modules. In the Kafka connector, the event processing method has been overhauled to return a payload size instead of an event object. The event formatting modules have been updated to streamline JSON flattening, schema inference, and record batch processing. HTTP ingestion handlers are refactored to leverage a unified log ingestion interface using a new JsonWithSize type, while several legacy modules and functions have been removed. Additionally, new schema management and partitioning features have been added, alongside adjustments to error handling and type updates across the codebase.

Changes

File(s) Change Summary
src/connectors/kafka/processor.rs Refactored ParseableSinkProcessor: removed build_event_from_chunk and added process_event_from_chunk to return the payload size; streamlined imports.
src/event/format/json.rs, src/event/format/mod.rs Updated event formatting: Added flatten_logs, modified to_data, decode, into_event signatures, introduced new schema inference and validation methods.
src/handlers/http/ingest.rs Refactored log ingestion functions (ingest, handle_otel_logs_ingestion, handle_otel_metrics_ingestion, handle_otel_traces_ingestion, post_event) to use JsonWithSize and PARSEABLE.get_or_create_stream; removed ingest_internal_stream and related test code.
src/handlers/http/mod.rs,
src/handlers/http/modal/ingest/ingestor_ingest.rs,
src/handlers/http/modal/utils/ingest_utils.rs,
src/handlers/http/modal/utils/mod.rs
Removed deprecated modules and functions: deleted kinesis module declaration from HTTP, removed legacy ingestion utilities and HTTP POST event handler.
src/lib.rs Added a new kinesis module declaration.
src/parseable/streams.rs Added commit_schema to Stream, renamed variable (schema_keyprefix), and refined schema file handling and logging messages.
src/utils/json/mod.rs Refactored JSON utilities: removed convert_array_to_object, updated flatten_json_body to return the nested value directly, added type alias Json, and added new test cases.
src/event/mod.rs Modified Event: introduced new PartitionEvent struct and partitions field; updated processing methods to iterate over partitions; changed origin_size type and removed legacy commit_schema function.
src/handlers/http/logstream.rs Enhanced detect_schema: now checks that each log record is a JSON object and returns a custom error if not.
src/utils/arrow/flight.rs Updated append_temporary_events: now retrieves the schema via stream.get_schema() and passes a reference to stream in push_logs_unchecked.
src/handlers/http/query.rs Updated schema commitment: now uses PARSEABLE.get_or_create_stream and commits schema directly on the stream.
src/handlers/http/cluster/mod.rs Replaced ingest_internal_stream with an internal stream created via PARSEABLE.get_or_create_stream; updated cluster metrics processing and error logging.
src/handlers/http/cluster/utils.rs Introduced new JsonWithSize<T> struct with a FromRequest implementation to encapsulate JSON payloads along with their byte size.
src/metadata.rs Updated update_stats function by changing the size parameter from u64 to usize.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant KafkaProcessor
    participant Stream
    Client->>KafkaProcessor: Send chunk of ConsumerRecords
    KafkaProcessor->>KafkaProcessor: Process via process_event_from_chunk
    KafkaProcessor->>Stream: Call push_logs (with JSON array & log source)
    Stream-->>KafkaProcessor: Return payload size
    KafkaProcessor->>Client: Log processed size
Loading
sequenceDiagram
    participant HTTPRequest
    participant IngestHandler
    participant PARSEABLE
    participant Stream
    HTTPRequest->>IngestHandler: Send event with JsonWithSize (json, byte_size)
    IngestHandler->>PARSEABLE: get_or_create_stream(stream_name)
    PARSEABLE-->>IngestHandler: Return Stream
    IngestHandler->>Event: Create new json::Event using json, byte_size, log_source
    IngestHandler->>Stream: Call push_logs with stream reference
    Stream-->>IngestHandler: Return processing result
Loading

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable

Poem

I'm a little rabbit, hopping with glee,
Celebrating changes from code to decree,
Refactored paths and processes anew,
Streams and logs now dancing in view.
With bytes hopped neatly and errors made light,
In our code realm, everything feels just right!
🐇💻

Tip

⚡🧪 Multi-step agentic review comment chat (experimental)
  • We're introducing multi-step agentic chat in review comments. This experimental feature enhances review discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments.
    - To enable this feature, set early_access to true under in the settings.
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@de-sh de-sh marked this pull request as ready for review March 1, 2025 07:32
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (2)
src/event/format/json.rs (2)

63-108: Great addition of flatten_logs, but watch out for large JSON inputs.
This function elegantly handles different log sources (Kinesis, OTEL, etc.) and converts them into flattened JSON arrays using specialized transformers. However, for very large JSON inputs, the repeated conversions and appending could become memory-intensive. If scaling is expected, consider streaming approaches or chunk processing.


310-311: Consider partial ingestion for non-object JSON entries.
collect_keys returns None if a single entry is not an object. Depending on your requirements, you might want to skip or sanitize malformed entries instead of completely halting ingestion.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e732821 and 1386d3b.

📒 Files selected for processing (12)
  • src/connectors/kafka/processor.rs (4 hunks)
  • src/event/format/json.rs (10 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/handlers/http/ingest.rs (6 hunks)
  • src/handlers/http/mod.rs (0 hunks)
  • src/handlers/http/modal/ingest/ingestor_ingest.rs (2 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (0 hunks)
  • src/handlers/http/modal/utils/mod.rs (0 hunks)
  • src/lib.rs (1 hunks)
  • src/parseable/streams.rs (3 hunks)
  • src/utils/arrow/mod.rs (2 hunks)
  • src/utils/json/mod.rs (1 hunks)
💤 Files with no reviewable changes (3)
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/utils/mod.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (24)
src/lib.rs (1)

32-32: Added the kinesis module to support AWS Kinesis-based log ingestion.

This addition aligns well with the event handling refactor objective and enhances Parseable's data source support.

src/handlers/http/modal/ingest/ingestor_ingest.rs (2)

22-22: Updated import to use new push_logs method.

This change is part of the event handling refactoring, replacing the previous flatten_and_push_logs function with the simplified push_logs.


41-41: Function call updated to use the new push_logs interface.

The change maintains the same parameter signature while leveraging the improved backend implementation.

src/parseable/streams.rs (1)

549-549: Improved log formatting consistency.

Removed the line break in the warning message for better log readability and consistency.

src/utils/json/mod.rs (3)

282-295: Good test for error handling with invalid input.

This test ensures that passing non-object arrays to the conversion function results in appropriate errors, enhancing error handling verification.


297-356: Comprehensive test for nested JSON object handling with SchemaVersion::V0.

This test verifies the correct flattening behavior for arrays of objects containing nested structures, ensuring proper JSON transformation with the older schema version.


358-417: Parallel test case for SchemaVersion::V1 behavior.

This test ensures the flattening behavior works correctly with the newer schema version, showing the expected differences in how nested objects are processed compared to V0.

src/connectors/kafka/processor.rs (3)

28-30: Simplified import statement for better readability.

The consolidated import statement improves code organization by grouping related imports together.


39-68: Well-structured refactoring of the event processing logic.

This is a significant improvement that simplifies the event processing flow:

  1. Method renamed from build_event_from_chunk to process_event_from_chunk to better reflect its purpose
  2. Return type changed from ParseableEvent to u64 (payload size) which is more relevant for this context
  3. Direct use of push_logs eliminates the need to extract schema-related attributes

The new implementation is more straightforward and focuses on the essential task of pushing logs to the stream.


77-79: Updated method call with appropriate logging.

The logging now correctly displays the size of processed records in bytes, which provides more useful operational metrics.

src/utils/arrow/mod.rs (2)

64-65: Documentation updated to reflect new parameter structure.

The documentation properly explains the new parameter format.


179-179: Test updated to use the new parameter format.

Good practice to ensure tests remain in sync with code changes.

src/event/format/mod.rs (5)

101-109: Enhanced method signature with additional parameters.

The to_data method signature has been expanded to include:

  1. time_partition_limit - Provides control over time partition boundaries
  2. custom_partition - Allows for custom partitioning logic
  3. Renamed the log source parameter for clarity

These additions provide more flexibility in how event data is processed and partitioned.


116-126: Added corresponding parameters to into_recordbatch method.

Ensures consistency between related methods and properly propagates the new parameters.


128-135: Updated method call with new parameters.

The call to self.to_data has been properly updated to include all the new parameters, maintaining consistency throughout the code.


170-174: Updated replace_columns call to match new parameter format.

The call has been correctly updated to use the tuple format that was introduced in the utils/arrow/mod.rs file, showing good cross-module consistency.


179-192: Updated into_event method signature.

The method signature has been appropriately updated to include the new parameters, ensuring consistency with the other method changes in this file.

src/handlers/http/ingest.rs (5)

83-88: Streamlined internal stream ingestion.

Similar to the main ingestion endpoint, this function has been simplified to directly get the stream and push logs to it, maintaining consistency across the codebase.


116-120: Consistent approach for OTEL logs ingestion.

The OTEL-specific ingestion handler now follows the same simplified pattern as the other endpoints, providing a consistent approach throughout the codebase.


150-154: Consistent approach for OTEL metrics ingestion.

The OTEL metrics ingestion follows the same pattern, further demonstrating the consistency of the refactored implementation.


181-185: Consistent approach for OTEL traces ingestion.

The OTEL traces ingestion also follows the same simplified pattern, completing the consistent implementation across all ingestion endpoints.


233-237: Consistent approach for direct stream posting.

The post_event handler follows the same pattern as other ingestion endpoints, completing the consistent implementation across all entry points.

src/event/format/json.rs (2)

301-302: Confirm that forcing None on unknown fields aligns with your schema evolution strategy.
Returning None when encountering a new field triggers a new schema inference path. This is correct if you want dynamic schema evolution, but if you intend to reject unknown fields, you may want to handle it differently.


394-395: Good enhancement for test coverage.
Using explicit type imports (Int64Type) clarifies unit tests and ensures correctness in arrow array comparisons. This makes the test suite more readable.

Comment on lines +154 to +163
if static_schema_flag
&& schema.iter().any(|field| {
storage_schema
.get(field.name())
.is_none_or(|storage_field| storage_field != field)
})
{
return Err(anyhow!("Schema mismatch"));
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplified schema validation with inline logic.

The previous separate is_schema_matching method has been replaced with inline schema validation logic that:

  1. Is more concise and easier to understand
  2. Directly expresses the validation condition
  3. Uses modern Rust pattern with is_none_or for cleaner code

This change improves readability while maintaining the same functionality.

Comment on lines 73 to 77
PARSEABLE
.get_or_create_stream(&stream_name)
.push_logs(json, &log_source)
.await?;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplified log ingestion with direct push_logs call.

This is a significant improvement that:

  1. Replaces complex flattening and processing logic with a direct method call
  2. Centralizes the log processing logic in the push_logs implementation
  3. Makes the code more maintainable by reducing complexity at the HTTP handler level

The new approach provides a cleaner separation of concerns between the HTTP handling and log processing logic.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/handlers/http/logstream.rs (1)

117-123: Consider including context (such as array index) in error message.

Right now, the error doesn’t indicate which array element triggered this failure, which might complicate debugging. Including the index of the invalid item in the message (e.g., “Expected an object at index: X”) could help troubleshoot large payloads.

src/utils/json/mod.rs (1)

259-274: Test coverage for error case might be more explicit.

Verifying that flatten_json_body fails for arrays containing non-object elements is great. If you’d like deeper assertions, you could test the exact error type or message for increased robustness.

src/event/mod.rs (1)

58-95: Key concatenation may benefit from a delimiter before timestamps.

Right now, key.push_str(&parsed_timestamp_to_min); appends the timestamp directly to the hashed schema key. Using a delimiter (e.g., _ or #) could reduce the chance of collisions or confusion in the resulting key string.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 38a52c2 and a7b2db3.

📒 Files selected for processing (6)
  • src/event/format/json.rs (9 hunks)
  • src/event/format/mod.rs (7 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/ingest.rs (7 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/utils/json/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/event/format/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (17)
src/utils/json/mod.rs (5)

25-25: Minor import expansion: no immediate issue.

Importing Map from serde_json is fine. Just be mindful of potential naming collisions with other modules named Json. Otherwise, this looks good.


32-32: Type alias is clear and improves readability.

Defining Json as a Map<String, Value> is a neat approach to standardize usage throughout the codebase.


67-67: Returning nested_value directly seems appropriate.

This line finalizes the flattening process correctly, no further concerns here.


276-333: Test naming and logic appear correct.

The test validates flattening with nested objects for SchemaVersion::V0. This thorough example increases confidence in the flattening logic.


335-392: Additional test for SchemaVersion::V1 is well-structured.

Ensuring different schema versions flatten data in the expected manner is excellent; no issues noted here.

src/event/mod.rs (2)

39-43: New PartitionEvent struct is a useful abstraction.

Encapsulating RecordBatch, timestamp, and partition data fosters clearer multi-partition handling. Confirm that storing RecordBatch by value doesn’t lead to unintended memory usage if events become large. Otherwise, this design is well-structured.

Also applies to: 51-51


100-111: Verify omission of time partition logic in process_unchecked.

Unlike the main process method, process_unchecked does not append timestamps or custom partition values to the computed key. If grouping data by time or custom partitions is still desired, consider adding them here too.

src/handlers/http/ingest.rs (5)

73-77: Simplified log ingestion with direct push_logs call.

This is a significant improvement that:

  1. Replaces complex flattening and processing logic with a direct method call
  2. Centralizes the log processing logic in the push_logs implementation
  3. Makes the code more maintainable by reducing complexity at the HTTP handler level

The new approach provides a cleaner separation of concerns between the HTTP handling and log processing logic.


83-87: Good simplification of internal stream ingestion.

The code has been streamlined by removing schema retrieval and event creation logic, instead directly pushing logs to the internal stream. This maintains consistency with the other ingestion endpoints and reduces code complexity.


116-119: Consistent use of push_logs across all ingestion handlers.

All OpenTelemetry ingestion handlers now use the same pattern of getting the stream and directly pushing logs. This consistency improves maintainability and ensures uniform behavior across different log types.

Also applies to: 150-153, 181-184


233-236: Consistent log handling in post_event.

The post_event endpoint now follows the same pattern as other ingestion endpoints, maintaining consistency throughout the codebase.


250-251: Clarify the uncertainty in the comment.

The comment "NOTE: Maybe should be false" suggests uncertainty about the correct value for is_first_event. This should be resolved rather than left as a question in the code.

Consider investigating the impact of this flag on log processing and document the decision more clearly:

-        is_first_event: true, // NOTE: Maybe should be false
+        is_first_event: true, // Set to true as this is always considered a new event

or

-        is_first_event: true, // NOTE: Maybe should be false
+        is_first_event: false, // Set to false as this is processing existing data
src/event/format/json.rs (5)

67-131: Well-structured log flattening logic that handles multiple formats.

The new flatten_logs function centralizes the log processing logic for different sources (Kinesis, OTel logs/metrics/traces), making the code more maintainable and easier to extend with new log formats in the future.

The function effectively:

  1. Deserializes the input based on log source type
  2. Applies appropriate flattening logic for each format
  3. Handles both single objects and arrays consistently

140-146: Method signatures updated for consistency.

Method signatures have been updated to include additional parameters like time_partition_limit and log_source, ensuring consistent handling of these parameters throughout the event processing pipeline.

Also applies to: 155-155, 235-246


343-352: Improved key collection with HashSet.

Changing the return type from Result<Vec<&'a str>, ()> to HashSet<&'a str> is a good improvement:

  1. Uses a more appropriate data structure for unique keys
  2. Simplifies error handling by eliminating explicit error types
  3. Makes the code more concise

354-361: Simplified field mismatch detection.

The updated fields_mismatch function is more concise and readable, making use of Rust's idiomatic is_none_or pattern to check field validity.


452-1069: Comprehensive test coverage for JSON processing.

The extensive test suite covers various scenarios:

  • Basic objects with different types
  • Null handling
  • Schema derivation
  • Type mismatches
  • Empty objects
  • Arrays of objects
  • Nested types
  • Schema version differences

This thorough testing approach helps ensure the reliability of the JSON processing logic after the significant refactoring.

Comment on lines 157 to 158
let fields = collect_keys(flattened.iter());

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Avoid using expect for collecting keys.

Currently, if collect_keys returns an empty set (e.g., if any log entry is not an object), this code might not handle it properly. Consider adding error handling to avoid unexpected behavior.

Check if the updated implementation properly handles the case when fields is empty:


🏁 Script executed:

#!/bin/bash
# Find all usages of collect_keys to verify error handling
rg -A 3 "collect_keys" --type rust

Length of output: 594


🏁 Script executed:

#!/bin/bash
# Search for any usage of `expect` in the file to confirm if it's still being used.
rg "expect(" src/event/format/json.rs

Length of output: 104


Review Check: Error Handling for Empty Key Collection

After verifying the relevant code areas, I confirm that the use of collect_keys no longer involves an expect call. However, the potential issue remains: if collect_keys(flattened.iter()) returns an empty set—such as when a log entry isn’t an object—this state isn’t explicitly managed. While the subsequent call to derive_arrow_schema uses an if let to check for a valid schema, it isn’t clear whether this gracefully handles the empty key scenario (or if it might lead to unexpected behavior further downstream).

To improve robustness, please consider one of the following:

  • Explicit Check: Add a conditional that verifies fields isn’t empty before proceeding.
  • Fallback/Error Handling: Document or implement a fallback mechanism (or an error log/early return) to handle cases when no keys are collected.

This change will help ensure the code behaves predictably, even when encountering non-object log entries.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
src/parseable/streams.rs (1)

116-124: ⚠️ Potential issue

New push_logs method has potential error handling issues.

The new method provides a streamlined interface for log ingestion, but it calls unwrap() on line 117 which could lead to panics in production if serialization fails.

Additionally, the method is marked as async but doesn't contain any await operations.

Apply these changes to improve error handling:

-pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> anyhow::Result<()> {
+pub fn push_logs(&self, json: Value, log_source: &LogSource) -> anyhow::Result<()> {
-    let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
+    let origin_size = serde_json::to_vec(&json)?.len() as u64; // string length need not be the same as byte length

    json::Event::new(json)
        .into_event(origin_size, self, log_source)?
        .process(self)?;

    Ok(())
}
🧹 Nitpick comments (12)
src/event/format/mod.rs (2)

116-116: Consider refactoring to reduce argument count
The #[allow(clippy::too_many_arguments)] hint suggests the function signature is getting bulky. For long-term maintainability, consider encapsulating related parameters into a dedicated struct or builder pattern.


153-154: Create a dedicated comment or function describing the "new_schema" preparation
Adding a short explanation or splitting into a helper function could improve clarity around the transformation steps for new_schema, especially considering time-partition updates and potential complexities in large schemas.

src/handlers/http/ingest.rs (2)

150-153: Refined OTEL metrics ingestion
This mirrors the pattern in logs ingestion. The uniform approach helps reduce code duplication, though a shared helper function might be considered if more OTEL variants arise.


257-257: Unchecked event processing
Calling process_unchecked(stream) avoids overhead from schema validation. While it boosts performance, keep in mind that it may also allow malformed records to enter your storage.

Would you like a fallback mechanism or error handling approach to reduce potential corruption when pushing data in "unchecked" mode?

src/event/format/json.rs (8)

33-37: Additional std and sync imports
Introducing HashMap, HashSet, and Arc is standard. If ingestion throughput is especially high, consider guard patterns to avoid overhead from frequent map operations.


67-131: Centralized flatten_logs function
Bringing all log-flattening logic under one function offers clarity. However, watch out for large JSON payloads causing bottlenecks at runtime. Depending on scale, consider streaming approaches or partial flattening to handle giant arrays.


157-157: Collecting keys for schema derivation
collect_keys(flattened.iter()) works well for simple merges. If partial or inconsistent records appear, it might lead to incomplete schemas or ingestion errors.


160-161: is_first event determination
Relying on whether a new schema is derived to set is_first is clever but might skip scenarios where the event is not literally “first” but still triggers a new field. Clarify or rename the variable if necessary.


231-239: Leverage stream-based partition config
Fetching partition configs like time_partition and custom_partitions from the stream ensures consistent ingestion logic. Just confirm that dynamic stream updates during ingestion do not conflict with these local references.


249-254: Extracting custom partitions
Splitting on commas for multiple custom partitions is straightforward. If user-supplied partition keys contain commas, you might need escaping or a more robust approach.


333-339: Schema derivation short-circuit
Returning None if a new field is found is straightforward. In future expansions, you might allow partial acceptance of new fields or dynamic updates rather than rejecting the entire batch.


355-360: Field mismatch detection
Inline fields_mismatch logic is easy to follow. In the future, consider returning a structured list of mismatch reasons to help users debug ingestion issues.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a7b2db3 and dc34a85.

📒 Files selected for processing (7)
  • src/event/format/json.rs (9 hunks)
  • src/event/format/mod.rs (6 hunks)
  • src/event/mod.rs (2 hunks)
  • src/handlers/http/ingest.rs (6 hunks)
  • src/parseable/mod.rs (1 hunks)
  • src/parseable/streams.rs (3 hunks)
  • src/utils/arrow/flight.rs (1 hunks)
🔇 Additional comments (33)
src/parseable/mod.rs (1)

31-31: Good addition of Stream to the public exports.

Adding the Stream type to the list of exported entities makes it accessible to other modules that import from parseable, supporting the event handling refactoring. This change follows good Rust module design patterns.

src/utils/arrow/flight.rs (2)

98-101: Improved by directly using Stream object instead of just the name.

The code now gets the Stream object first and uses it consistently throughout the function. This provides better encapsulation and follows object-oriented principles.


105-106: Streamlined function call by passing stream reference.

Changed from passing a stream name to passing the Stream reference directly, which is more efficient as it avoids looking up the stream again inside push_logs_unchecked.

src/parseable/streams.rs (1)

530-531: Improved logging message clarity.

The warning message for file not found has been streamlined for better readability, using file.display() to show the file path.

src/event/mod.rs (4)

39-43: Good introduction of PartitionEvent to encapsulate related data.

Creating a dedicated struct to group related data (record batch, timestamp, and partition values) improves code organization and makes the relationships between these fields more explicit.


45-52: Improved Event struct with partitions vector.

The Event struct has been refactored to contain a vector of PartitionEvents, which allows handling multiple partitions per event. This design is more flexible and better supports complex event processing scenarios.


56-94: Refactored process method to handle multiple partitions.

The process method now iterates through each partition and applies the appropriate operations. This change supports the new multi-partition event model and properly delegates to the stream for data storage.

The method signature has also been updated to take a Stream reference, strengthening the relationship between Event and Stream.


98-110: Consistent refactoring of process_unchecked method.

The process_unchecked method has been updated to match the changes in the process method, maintaining consistency in the API. This method now also takes a Stream reference and processes multiple partitions.

src/event/format/mod.rs (7)

23-23: Use of NonZeroU32 for partition limit is appropriate
This import indicates the intention to avoid zero values for the time partition limit. It's a good approach to guarantee meaningful partition constraints.


36-40: New imports align with the extended event processing logic
Bringing in Stream, get_field, get_timestamp_array, replace_columns, and Json is consistent with the updated record batch construction flow. Ensure that any performance-sensitive areas involving these utilities have sufficient benchmarks, especially under high ingestion loads.


108-112: Additional parameters enhance flexibility
Extending fn to_data with time_partition_limit and custom_partition provides finer control over partitioning. Ensure any external calls to to_data are updated to supply valid values for these parameters or None as applicable.


117-125: New signature provides direct data injection
The updated into_recordbatch method signature is clearer about required data, schema, and partition details. Confirm that all call sites pass consistent arguments to prevent runtime mismatches in timestamp or schema references.


143-149: Validate schema mismatch logic
This inlined check for static schema mismatch is convenient. However, be sure that any nominal type differences (e.g., Float64 vs. Int64 for schema version upgrades) are gracefully handled to avoid frequent ingestion failures.


158-163: Column replacement for the timestamp
Replacing the first column with the computed timestamp array is a neat solution. Just verify that clients querying this data are aware that index 0 is now reserved for the system timestamp, preventing confusion.


165-165: Finalize record batch as expected
Returning the newly generated RecordBatch is straightforward. No issues spotted, provided that downstream consumers handle any schema modifications introduced by prior steps.

src/handlers/http/ingest.rs (8)

30-30: New import for LogSource
This import is consistent with the shift to a direct log source model in ingestion functions. Confirm consistent usage across handlers.


34-34: Access to PARSEABLE & Stream
Exposing Stream and PARSEABLE again emphasizes centralizing ingestion. Ensure concurrency controls exist for multi-threaded ingestion scenarios.


73-76: Refactored ingestion flow with direct push_logs
This refactor removes unnecessary flattening logic at the handler level, delegating directly to push_logs. It simplifies code but double-check that any exceptions raised in push_logs (e.g., schema mismatch) are handled or reported properly here.


83-87: Ingesting into an existing stream
Using get_stream with a potential fallback path organizes ingestion well. Just confirm there's a fallback or a user-facing error if the stream name is invalid or not found.


116-119: OTel logs ingestion for newly created stream
A consistent approach with get_or_create_stream. Make sure any changes in OTel data structure are thoroughly tested to avoid partial ingestion or data corruption.


181-184: OTel traces ingestion
Similar to logs and metrics, everything is abstracted into push_logs. Keep an eye on the potential performance overhead if OTel data sets are large.


233-236: Unified ingestion for explicit logstream
Pushes logs via the same method used in other ingestion endpoints. Check that usage analytics or logging facets remain consistent for user-defined vs. internal streams.


249-254: is_first_event: true potential logic discrepancy
Hard-coding is_first_event to true might misrepresent repeated ingestion calls. Clarify or dynamically determine whether an event truly is the first to avoid misleading metadata or schema transitions.

src/event/format/json.rs (10)

29-31: OpenTelemetry log, metrics, and trace imports
These references show that OTel data structures are parsed directly. Keep an eye on updates to these crates to stay compatible with future OTel versions.


40-41: Using EventFormat and LogSource
Aligns with the new trait-based approach to ingest various event types. This fosters a cleaner separation of concerns.


42-47: Flattening different log types
Imports for specialized flatteners (Kinesis, OTel logs/metrics/traces) indicate a consolidated method of turning raw JSON into standardized records. Ensure each flattener is thoroughly tested for unusual or edge-case formats.


133-135: type Data = Vec<Json>
Retaining a vector representation can be memory-intensive if logs are large. If memory usage becomes an issue, consider streaming or chunk-based ingestion.


140-146: Expanded to_data signature
Including new partition parameters and the log_source clarifies responsibilities within EventFormat. Double-check that all call sites supply consistent arguments, or fallback defaults.


147-155: Flatten logs before inferring schema
Invoking flatten_logs here is logical. If flattening fails for some edge case (e.g., deeply nested objects or partially invalid OTel data), ensure bubble-up error handling is clearly traceable.


225-230: New into_event signature
Taking stream and log_source directly avoids confusion of many separate parameters. Make sure no potential concurrency pitfalls arise from referencing the shared stream object in parallel actions.


259-263: Extracting time partition
Falling back to the event’s own timestamp if no partition key is present ensures consistent event ordering. Just confirm that the partial mismatch between local times and global ordering is acceptable for the system’s design.


341-343: Key collection from JSON objects
No explicit check for non-object values is performed here—flattening presumably ensures objects only. If flattened logs can contain non-object nodes, an early error might improve clarity.


423-424: Extra type definitions for test arrays
Reusing these type casts in tests is convenient. This organization helps keep test code tidy.


Ok(HttpResponse::Ok().finish())
}

pub async fn push_logs_unchecked(
batches: RecordBatch,
stream_name: &str,
stream: &Stream,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Updated function signature for push_logs_unchecked
Accepting a &Stream parameter emphasizes the new ingestion pattern. Ensure concurrency safety whenever multiple threads attempt “unchecked” pushes simultaneously.

In highly concurrent deployments, consider locking or queueing events to avoid partial writes or race conditions in process_unchecked.

Comment on lines 264 to 279
let rb = Self::into_recordbatch(
p_timestamp,
vec![json],
schema.clone(),
&storage_schema,
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;

partitions.push(PartitionEvent {
rb,
parsed_timestamp,
custom_partition_values,
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Per-event recordbatch creation
Generating a distinct RecordBatch for each item in the data array could be expensive for large volumes. Batching them together might improve performance if your usage tolerates it.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/event/format/json.rs (1)

357-366: Simplified collect_keys implementation but needs error handling

The collect_keys function now returns a HashSet<&'a str> instead of an Option, which simplifies code but means there's no built-in way to check if the set is empty or if no valid fields were collected.

Consider adding explicit handling for the empty set case in the to_data method to avoid issues with schema inference.

- let fields = collect_keys(flattened.iter());
+ let fields = collect_keys(flattened.iter());
+ if fields.is_empty() {
+     return Err(anyhow!("Failed to collect any valid fields from the provided JSON objects"));
+ }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dc34a85 and 4c1f6d8.

📒 Files selected for processing (6)
  • Cargo.toml (1 hunks)
  • src/event/format/json.rs (9 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/ingest.rs (6 hunks)
  • src/handlers/http/query.rs (1 hunks)
  • src/parseable/streams.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (21)
Cargo.toml (1)

12-13: Updated arrow dependencies for better consistency.

The change simplifies the arrow-array dependency declaration while adding the core arrow crate with matching versions. This looks like a good improvement for dependency management.

src/handlers/http/query.rs (1)

175-177: Improved schema management using stream abstraction.

The refactoring improves encapsulation by having the stream instance handle its own schema commitment rather than using a standalone function. This aligns well with object-oriented principles and the broader refactoring pattern in this PR.

src/event/mod.rs (4)

34-38: Good introduction of PartitionEvent struct for better data encapsulation.

The new PartitionEvent struct cleanly encapsulates related fields (record batch, timestamp, and partition values) that were previously separate in the Event struct. This improves code organization and maintainability.


40-47: Well-structured Event refactoring to support multi-partition processing.

The Event struct now uses a HashMap to store multiple PartitionEvent instances, which provides more flexibility for handling logs with different partitioning schemes. This change is aligned with the overall refactoring goals and enhances the system's capability to process varied log formats.


51-75: Improved process method with cleaner iteration over partitions.

The refactored process method properly handles multiple partitions through iteration, maintaining the same functionality while being more flexible. The schema commitment, data pushing, and stat updating are all properly adapted to work with the new structure.


79-88: Enhanced process_unchecked method for consistent partition handling.

The updated method correctly iterates over partitions in the same way as the main process method, maintaining consistency in the codebase.

src/handlers/http/ingest.rs (9)

73-77: Simplified log ingestion with direct push_logs call.

This approach centralizes log processing logic and provides a cleaner separation of concerns between HTTP handling and log processing. The code is now more maintainable with reduced complexity at the handler level.


83-87: Streamlined internal stream ingestion using push_logs.

The ingestion process for internal streams has been simplified by directly using the push_logs method, which removes redundant code and improves maintainability.


116-119: Consistent use of push_logs for OTEL logs ingestion.

The refactoring applies the same pattern across all ingestion methods, which improves code consistency and makes future maintenance easier.


150-153: Consistent use of push_logs for OTEL metrics ingestion.

Following the same pattern as other ingestion methods maintains consistency across the codebase.


181-184: Consistent use of push_logs for OTEL traces ingestion.

The consistent application of the same pattern across all ingestion methods is excellent for code maintainability.


233-236: Consistent use of push_logs for direct event posting.

This maintains the same pattern established throughout the file, which is good for consistency and maintainability.


242-244: Updated function signature for push_logs_unchecked

The function now accepts a &Stream parameter directly, which aligns with the new ingestion pattern and emphasizes the stream-centric approach to log handling.

When deploying to highly concurrent environments, ensure proper synchronization mechanisms are in place to avoid race conditions when multiple threads attempt "unchecked" pushes simultaneously.


250-259: Well-structured Event construction using the new PartitionEvent format.

The creation of the unchecked event correctly uses the new partitioned structure with a consistent key generation based on schema fields. This aligns well with the changes in the event module.


263-263: Properly updated process_unchecked call with stream parameter.

The call to process_unchecked now correctly passes the stream parameter, matching the updated method signature in the Event struct.

src/event/format/json.rs (6)

23-52: Clean import organization and new dependencies

The imports have been properly updated to include necessary dependencies for the refactored event handling - OpenTelemetry proto types, specific collection types, and non-zero integers. The organization into distinct blocks improves readability.


68-132: Well-structured log flattening abstraction

The new flatten_logs function effectively centralizes the logic for handling different log sources (Kinesis, OpenTelemetry logs/traces/metrics) into a single function, which is excellent for maintainability. The pattern matching with detailed comments for each case makes the code easy to follow.


137-207: Improved to_data method with clean error handling

The to_data method has been nicely refactored with:

  1. Clearer parameter naming (stored_schema vs previous schema)
  2. Proper use of the new flatten_logs function

However, the code at line 158 still doesn't properly handle the case when collect_keys returns an empty set.

#!/bin/bash
# Check if collect_keys function is properly handling empty sets
rg "collect_keys" src/event/format/json.rs -B 2 -A 5

210-223: Simplified return type for consistency

Changing the return type from Result<RecordBatch, anyhow::Error> to anyhow::Result<RecordBatch> improves code consistency while maintaining the same functionality.


227-304: Excellent parameter reduction in into_event method

The method has been significantly improved by:

  1. Taking references to structured objects (Stream and LogSource) rather than individual parameters
  2. Extracting values from these objects when needed
  3. Using proper error handling with the ? operator

This change reduces parameter count and makes the API more maintainable.


467-1083: Comprehensive test coverage for different scenarios

Excellent work adding thorough test cases that cover:

  1. Basic JSON object processing
  2. Null value handling
  3. Schema derivation and compatibility
  4. Array processing
  5. Nested type handling
  6. Schema version differences

This comprehensive test suite will help ensure the refactored code maintains correct behavior across all scenarios.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4c1f6d8 and 5a2bcc1.

📒 Files selected for processing (9)
  • src/connectors/kafka/processor.rs (4 hunks)
  • src/event/format/json.rs (9 hunks)
  • src/event/format/mod.rs (6 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/cluster/mod.rs (2 hunks)
  • src/handlers/http/cluster/utils.rs (2 hunks)
  • src/handlers/http/ingest.rs (9 hunks)
  • src/metadata.rs (1 hunks)
  • src/parseable/streams.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/ingest.rs
🧰 Additional context used
🪛 GitHub Actions: Lint, Test and Coverage Report
src/handlers/http/cluster/utils.rs

[error] 291-291: Clippy: useless conversion to the same type: actix_web::Error. Consider removing .into(): ErrorPayloadTooLarge(byte_size).


[error] 303-303: Clippy: redundant closure. Replace the closure with the function itself: JsonPayloadError::Deserialize.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (20)
src/handlers/http/cluster/utils.rs (2)

284-297: Efficient payload handling and size limiting

The streaming approach to collect bytes chunk by chunk with size limit checks provides an effective defense against potential DoS attacks. The early return with ErrorPayloadTooLarge ensures that oversized payloads are rejected before processing the entire request.

🧰 Tools
🪛 GitHub Actions: Lint, Test and Coverage Report

[error] 291-291: Clippy: useless conversion to the same type: actix_web::Error. Consider removing .into(): ErrorPayloadTooLarge(byte_size).


19-36: Appropriate imports for the new functionality

All the necessary imports for handling futures, payloads, and error types have been added to support the new JsonWithSize<T> implementation.

src/metadata.rs (1)

38-38: Type change from u64 to usize for size parameter.

This change aligns the size parameter type with the origin_size field in the Event struct (in src/event/mod.rs), providing type consistency across the codebase. The change from u64 to usize is appropriate as usize is the natural type for memory-related operations in Rust.

src/handlers/http/cluster/mod.rs (1)

777-797: Streamlined cluster metrics ingestion workflow.

The refactored code improves error handling and provides a more direct approach to processing cluster metrics. The code now:

  1. Properly checks if metrics are available before processing
  2. Calculates the byte size correctly
  3. Uses the new push_logs method from the Stream interface
  4. Includes appropriate error handling and logging

This change is part of the broader event handling refactoring and provides a more consistent approach to log ingestion across the codebase.

src/connectors/kafka/processor.rs (2)

39-69: Improved Kafka record processing with direct log ingestion.

The refactored method process_event_from_chunk greatly simplifies the event processing flow by:

  1. Removing unnecessary schema-related attribute retrievals
  2. Directly pushing logs to the stream using the new push_logs method
  3. Tracking and returning the payload size instead of constructing an event object

This approach is more efficient and aligns with the broader refactoring of event handling in the application.


78-80: Updated process call and logging to match new return type.

The method call and logging statement have been correctly updated to match the new return type (usize for payload size instead of an event object).

src/parseable/streams.rs (3)

116-127: New push_logs method provides streamlined log ingestion.

This new method offers a simplified interface for log processing by directly creating a JSON event and processing it. The method signature appropriately takes the JSON value, origin size, and log source as parameters, making the interface cleaner for callers.

The implementation is straightforward and handles the full event processing flow in a single method call, which aligns with the broader refactoring of event handling in the codebase.


630-641: New commit_schema method for schema management.

This method provides a clean way to merge a new schema with the existing one and update the metadata. The implementation correctly:

  1. Retrieves the current schema
  2. Merges it with the new schema
  3. Converts the fields to a map for storage in the metadata
  4. Updates the metadata with the merged schema

This addition enhances the Stream struct's capabilities for schema management.


533-534: Simplified warning log statement.

The multi-line warning has been consolidated into a single line for improved readability without changing its functionality.

src/event/mod.rs (4)

34-38: Good addition of the PartitionEvent struct.

The introduction of this struct nicely encapsulates the related data (record batch, timestamp, and partition values) that was previously scattered across the Event struct. This improves code organization and makes the data relationships clearer.


40-46: Event struct now uses a partition-based approach.

The refactoring from individual fields to a map-based partitioning approach is a good architectural decision that makes the code more flexible for handling diverse log sources and partitioning schemes.

Note the type change from u64 to usize for origin_size, which aligns better with Rust's memory model.


51-75: Processing logic appropriately updated for multi-partition support.

The process method has been effectively refactored to handle multiple partitions. Each partition is now processed independently with its own schema, timestamp, and custom partition values.

The change to pass the stream parameter explicitly rather than accessing it through a global or cached reference improves testability and makes dependencies clearer.


79-88: Well-structured process_unchecked method.

This method has been properly updated to match the changes in the process method, maintaining consistent behavior between the two methods.

src/event/format/mod.rs (2)

104-112: Improved to_data method signature with explicit parameters.

The addition of time_partition_limit, custom_partition, and log_source parameters makes the function more flexible and reduces the need for global state or context objects.


117-166: Well-refactored into_recordbatch method with simplified schema validation.

The previous separate is_schema_matching method has been replaced with inline schema validation logic that:

  1. Is more concise and easier to understand
  2. Directly expresses the validation condition
  3. Uses modern Rust pattern with is_none_or for cleaner code

This change improves readability while maintaining the same functionality.

src/event/format/json.rs (5)

137-207: Refactored to_data method with improved parameter handling.

The method now properly uses the new flatten_logs function and handles the schema derivation more cleanly. The variable rename from schema to stored_schema improves clarity by distinguishing it from the derived schema.

However, there's still a potential issue with the error handling for empty key collections:

The implementation should more explicitly handle the case when fields is empty (line 158). While the code uses conditional logic to create a schema, it's not entirely clear how an empty field set would be handled downstream.


227-303: Significantly improved into_event implementation.

The implementation now:

  1. Extracts all necessary configuration from the stream object
  2. Creates partition-specific record batches
  3. Uses appropriate key generation with schema and partition values
  4. Concisely concatenates record batches for the same partition

This approach handles partitioning more effectively than the previous implementation and properly implements the partition-based architecture.


355-366: Simplified collect_keys function with better return type.

The function now returns a HashSet<&'a str> instead of a complex Result type, making it easier to use and understand.


368-375: Improved fields_mismatch implementation.

The use of is_none_or makes the code more concise and readable. The function now checks if the field doesn't exist in the schema or has an invalid type more clearly.


467-1083: Comprehensive test suite additions.

The addition of extensive unit tests is excellent. The tests cover:

  1. Basic object conversion
  2. Null handling
  3. Schema derivation
  4. Schema mismatch cases
  5. Nested type handling
  6. Schema version differences

This thorough test coverage will help ensure the refactored code works correctly and prevent future regressions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/parseable/streams.rs (1)

262-266: Consider avoiding .expect(...) in production code.
Using .expect("Schema File should exist") will cause a panic if the file is missing or inaccessible. It would be beneficial to handle this more gracefully by returning a custom error or skipping the file.

Below is a potential fix:

-for path in self.schema_files() {
-    let file = File::open(path).expect("Schema File should exist");
-    if let Ok(schema) = serde_json::from_reader(BufReader::new(file)) {
-        schemas.push(schema);
-    }
+for path in self.schema_files() {
+    let file = File::open(&path).map_err(|e| {
+        tracing::error!("Failed to open schema file {path:?}: {e}");
+        StagingError::Create // or any relevant error variant
+    })?;
+
+    if let Ok(schema) = serde_json::from_reader(BufReader::new(file)) {
+        schemas.push(schema);
+    }
}
src/connectors/kafka/processor.rs (1)

41-72: Check partial parse error handling.
Records that fail JSON deserialization (serde_json::from_slice) are silently dropped. Consider logging these parse errors or returning them to track ingestion failures.

src/event/format/json.rs (1)

71-134: New flatten_logs implementation.
The approach handles different log sources (Kinesis, Otel, etc.) effectively, then flattens each event. Watch out for the unreachable!() branch at line 129—consider a more explicit error to safeguard against unexpected input.

-                _ => unreachable!("flatten would have failed beforehand"),
+                other => return Err(anyhow!("Unexpected JSON format: {other:?}"))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a86fc47 and 51d166e.

📒 Files selected for processing (6)
  • src/connectors/kafka/processor.rs (4 hunks)
  • src/event/format/json.rs (8 hunks)
  • src/event/format/mod.rs (6 hunks)
  • src/handlers/http/cluster/mod.rs (3 hunks)
  • src/handlers/http/ingest.rs (9 hunks)
  • src/parseable/streams.rs (10 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/handlers/http/cluster/mod.rs
  • src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (30)
src/parseable/streams.rs (5)

23-23: No issues with the new import.
Thank you for including BufReader; it seems necessary for reading schema files.


62-62: LGTM for usage of DiskWriter.
The import is consistent with the new staged disk writing approach.


129-139: Safe creation of DiskWriter after path resolution.
The approach of creating the directory if it doesn't exist and then instantiating a DiskWriter is correct and robust.


159-159: Clear filename format for parted data.
Incorporating date, hour, minute, and hostname in the filename improves traceability and structure.


596-607: Schema merging appears logically sound.
Merging the current schema with the new one, then storing it in metadata, is a clean approach.

src/connectors/kafka/processor.rs (2)

30-30: Import consolidation is fine.
No issues with referencing LogSource from the same line of imports.


81-83: Reporting of record count and size is well-done.
Providing debug logs with the total processed size is helpful for observing ingestion performance.

src/event/format/mod.rs (6)

23-23: Import of NonZeroU32 recognized.
No concerns.


36-40: Neat addition of parseable::Stream and arrow utils.
Adding these references is consistent with new event handling refactors.


106-112: Extended signature in to_data method.
The inclusion of static_schema_flag, time_partition_limit, and custom_partitions expands flexibility in event formatting without complicating the interface excessively.


117-147: Logic in prepare_and_validate_schema.
Nicely ensures the DEFAULT_TIMESTAMP_KEY isn't overwritten and checks for static schema mismatches. This approach is appropriate for dynamic vs. static checks.


149-170: Implementation of into_recordbatch method is straightforward.
Replacing columns with the newly generated timestamp array is a clear approach. No obvious issues.


171-172: Method signature for into_event updated.
Accepting a reference to Stream is simpler than passing multiple parameters.

src/event/format/json.rs (17)

23-23: Using arrow::compute::concat_batches.
No concerns, helps with batch concatenation.


30-32: Inclusion of OpenTelemetry log/metrics/traces Data structures.
This improves the code’s ability to handle more telemetry sources.


35-38: Extended usage of HashMap, HashSet, NonZeroU32, and Arc.
These additions align well with the additional logic needed for partitioning and concurrency.


41-52: Imported references for EventFormat, log flattening, and new modules.
No issues found.


54-54: Introducing origin_size and log_source into Event.
These fields improve traceability of event size and provide context for specialized flattening logic. Implementation in the new constructor is straightforward.

Also applies to: 58-58, 62-69


138-210: EventFormat trait implementation expansions.
The new logic in to_data merges inferred schema with stored schema and includes prepare_and_validate_schema. The error flows are well-structured.


214-214: Simplifying the return type of decode.
Returning anyhow::Result<RecordBatch> fosters consistent error handling.


228-299: Reworked into_event method.
Informs partition logic with custom_partitions and time_partition, then concatenates record batches. This is a clean approach to final event assembly.


305-310: Clear error message for schema mismatch.
Communicating the mismatch reason fosters easier debugging.


333-343: Good practice for partition-based time extraction.
The function returns a dedicated error if the field is missing or unparseable.


347-358: Conditionally deriving arrow schema.
Returning None if a new field is not found in the existing schema is consistent with the approach.


360-371: Collecting object keys into a set.
This is straightforward, but watch out for potential collisions if keys differ only by case, though that's likely not an immediate concern.


373-380: Logical check for field presence and type validation.
No issues found.


442-445: Additional test imports (Int64Type, etc.).
No concerns.


449-451: Test covering parse_time_partition_from_value.
Verifying the correct parse of an ISO8601 date/time string is crucial.


459-460: Test ensuring time_partition field presence.
The error scenario is validated properly.


467-468: Test verifying string parse failure for invalid date/time.
Covers a negative path for time parsing.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/event/format/json.rs (1)

70-133: 🛠️ Refactor suggestion

flatten_logs function handles multiple log sources.

  1. The _ => unreachable!() for the final branch could panic in production if unexpected data sneaks through. Replacing it with a proper error return would improve robustness.
  2. Overall structure is clear and well-documented with inline comments.
- _ => unreachable!("flatten would have failed beforehand"),
+ other => return Err(anyhow!("Encountered an unexpected JSON type: {other:?}")),
🧹 Nitpick comments (4)
src/event/mod.rs (2)

34-40: Add documentation for the new PartitionEvent struct.
While the struct fields are self-explanatory, attaching doc comments to each field would ease understanding for contributors unfamiliar with this code.


82-93: process_unchecked lacks stats and schema commits.
The distinction from process is clear, yet consider documenting the rationale behind skipping commits and stats updates for clarity.

src/event/format/json.rs (2)

61-61: New constructor Event::new(...).
This is straightforward, though consider adding doc comments to clarify how origin_size is measured (e.g., raw bytes).


261-261: Extract custom partition values if configured.
Implementation is standard. Consider ignoring empty or null fields explicitly if relevant.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 51d166e and 5d87407.

📒 Files selected for processing (4)
  • src/event/format/json.rs (7 hunks)
  • src/event/format/mod.rs (7 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/ingest.rs (9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (67)
src/event/mod.rs (6)

23-23: Imports look good.
No particular issues to report on this import statement.


28-28: Additional imports to support streaming and storage.
No concerns, as these are consistent with the references below.


42-42: #[derive(Debug)] on Event is beneficial.
No issues; this is consistent with typical Rust debugging needs.


45-45: Confirm feasibility of changing origin_size to usize.
Depending on the system architecture, large event sizes might exceed usize limits on 32-bit systems. If extremely large events are possible, re-evaluating u64 could be prudent.


48-48: Introducing partitions to group record batches.
This addition makes the event struct more flexible and extensible.


54-80: Validate concurrency and error handling within process.
The loop and minor condition checks appear correct; schema is only committed once if is_first_event is true. Ensure no concurrency hazards occur if multiple threads call this simultaneously, though it likely meets current requirements.

src/event/format/json.rs (44)

29-31: New OpenTelemetry imports.
These seem necessary for handling logs, metrics, and trace data. No issues here.


33-37: Added standard library imports.
All are relevant for the new logic. No concerns.


40-51: Reorganized imports for the json module.
The references to PartitionEvent and flattening helpers are consistent with the code below.


55-57: Updated fields in Event struct (origin_size and log_source).
No immediate issues, but ensure the log_source field is consistently set in all code paths to prevent confusion down the line.


139-139: to_data method refactor and new parameters.
The added time_partition_limit and custom_partitions integration align with the extended flattening logic. Implementation looks coherent.

Also applies to: 144-145, 147-148


156-162: infer_schema signature updated.
Accepting a single data element plus additional parameters is reasonable. This clarifies usage and moves towards consistent design.


164-164: Collecting keys from a single JSON object.
Ensure the object is validated before usage. The approach is otherwise straightforward.


167-169: Checking derived schema from stored schema.
Looks correct; returns Some(...) or proceeds to infer. No issues noted.


195-195: Marking is_first = true when a new schema is derived.
This logic is consistent with the notion of a newly created or expanded schema.


210-210: Call to prepare_and_validate_schema.
Helps keep the schema consistent with storage constraints. No concerns.


223-223: serialize(data)? usage.
This can produce an error if data is malformed. Properly handled by the ? operator, so it's fine.


232-232: into_event entry point for building Event.
Clean integration with the Stream.


241-243: Extract p_timestamp, origin_size, and build data.
Keeps logic consistent with earlier changes to Event.


249-249: is_first_event defaulting to false.
Reasonable default until proven otherwise in the loop.


250-250: Initialize partitions as HashMap::new().
No remarks; standard usage.


252-252: Iterate over flattened data.
Ensures each JSON chunk is processed individually.


253-253: Infer schema for each JSON record.
Allows dynamic adaptation if the data shape changes record by record, though repeated schema inference can have performance costs.


260-260: Accumulate is_first_event.
Straightforward OR logic to track any new schema events.


269-269: Derive parsed_timestamp from JSON or fallback.
Reasonable approach, deferring to the time_partition presence.


274-274: Create RecordBatch with into_recordbatch.
Flow is consistent with the trait-based approach.


281-281: Access final schema from the created batch.
Avoids storing stale references; no issues.


283-283: Generate partition key from schema hash.
Implementation detail is crisp, referencing get_schema_key.


289-289: Append custom partition kv pairs to partition key.
Straightforward approach; sorting ensures deterministic ordering.


293-306: Construct or update PartitionEvent within the HashMap.
Logic is idiomatic: push to existing or insert a new struct. No concerns.


309-309: Finalize the newly built Event.
Fields align with earlier definitions, reusing "json" as origin_format.


323-323: extract_custom_partition_values function signature.
Well-scoped. Ensure robust error handling if the partition field is missing from JSON.


345-345: extract_and_parse_time function.
Returns an error if the field is absent or unparsable. Good approach.


358-359: derive_arrow_schema returns Option<Vec<Arc<Field>>>.
When it returns None, new fields are inferred. Implementation is consistent.


370-371: collect_keys from a single JSON object.
Simple approach to gather field names. No issues.


375-376: fields_mismatch checks correctness for known fields.
Ensures data type alignment. Reasonable design.


443-444: Importing additional array types for testing.
No issues; these are used in test assertions.


474-492: TestExt trait for accessing typed arrays in tests.
Enhances readability in test code. Good approach.


494-496: Helper: fields_to_map.
Functional for building test schemas quickly.


498-529: Test basic_object_into_rb.
Covers standard object with numeric/string fields. Good coverage.


531-559: Test basic_object_with_null_into_rb.
Verifies null handling. Implementation is correct.


560-593: Test basic_object_derive_schema_into_rb.
Ensures it merges with stored schema. Good scenario coverage.


595-618: Test basic_object_schema_mismatch.
Properly expects an error if stored schema conflicts. Good negative test.


620-643: Test empty_object.
Confirms partial schema usage and ensures at least one column. Looks valid.


646-694: Test array_into_recordbatch_inffered_schema.
Verifies inference across multiple objects in an array. Comprehensive coverage.


697-739: Test arr_with_null_into_rb.
Properly checks mixing null and valid data. No issues.


742-791: Test arr_with_null_derive_schema_into_rb.
Similar coverage, ensuring merges with existing schema that includes potential null fields.


794-829: Test arr_schema_mismatch.
Expects an error when numeric/string type conflict arises. Solid negative test.


832-907: Test arr_obj_with_nested_type.
Covers arrays within objects. Thorough approach verifying list array behavior.


909-967: Test arr_obj_with_nested_type_v1.
Similar coverage under schema version V1 with forced float conversions. Good consistency check.

src/event/format/mod.rs (17)

23-23: Introducing NonZeroU32 in the format module.
Used to limit certain partition-based logic (e.g., size). Looks good.


27-27: use anyhow::anyhow;
Used for error management. No issues found.


36-36: Importing parseable::Stream.
Consistent with the rest of the code integration.


101-101: Type alias IsFirstEvent = bool.
Improves readability when returning the "is first" status from infer_schema.


109-110: Added time_partition_limit and custom_partitions to trait.
Aligns with the changes in json.rs, boosting extensibility.


114-114: New infer_schema method signature includes IsFirstEvent.
Clear approach to capturing both the schema and a boolean result in one call.


122-122: decode now handles slices of data.
Enables batch-based decoding. This is consistent with the new approach in json.rs.


125-125: Simplified prepare_and_validate_schema returning EventSchema.
Refined approach for finalizing the schema with reserved fields. Straightforward.


144-155: Validate static schema constraints during preparation.
Ensures the new event fields don't conflict with stored schema if static_schema_flag is set. Proper error return if mismatch occurs.


157-157: Refined into_recordbatch signature.
Directly injects p_timestamp and uses the simpler approach with data: &[Self::Data].


163-163: Using Arc::new(Schema::new(schema.clone())).
References a cloned vector of Field. No immediate concerns, though defensive copying might have a minimal overhead. Acceptable in current design.


167-167: Call to update_field_type_in_schema with None for log_records.
Defers final type adjustments for time partition logic. Implementation is cohesive.


173-173: Replacing timestamp column at index 0.
Good approach to universally apply p_timestamp.


233-233: update_field_type_in_schema function
Renamed/refined to handle new partition/time logic. Implementation remains consistent with the extended design.


237-237: log_records: Option<&Json> as an argument.
Allows dynamic adjustments based on actual data. Good design choice.


248-249: Handling override of data type based on log_record.
Applies custom rules introduced for schema version V1. No issues.


279-280: override_data_type modifies field types.
Handles new fields based on heuristics (time or numeric). This is a straightforward approach that helps unify data.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
src/event/format/json.rs (1)

323-329: ⚠️ Potential issue

Avoid panic when partition field is absent.
Using .unwrap() can crash if the custom partition key doesn’t exist. Return an error or skip instead.

-    let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+    let Some(val) = json.get(custom_partition_field.trim()) else {
+        return Err(anyhow!("Missing custom partition field: {}", custom_partition_field));
+    };
+    let custom_partition_value = val.to_owned();
🧹 Nitpick comments (5)
src/event/format/json.rs (5)

55-66: Consider adding doc comments for new fields.
origin_size and log_source would benefit from small doc comments or inline explanations, clarifying their purpose and usage.


70-133: Avoid using unreachable!() in production code.
Line 128 uses unreachable!(), which can panic if the assumption ever becomes invalid. It's safer to return an error, preserving runtime stability.

-                _ => unreachable!("flatten would have failed beforehand"),
+                other => return Err(anyhow!("Expected object or array, got: {other:?}")),

156-212: Schema inference may benefit from caching or logs.
This dynamic approach works, but frequent merges can degrade performance in high-throughput scenarios. Adding caching or debug logs for newly inferred fields could improve maintainability.


233-307: Break down large method for clarity.
into_event is extensive. Consider extracting partition logic, time extraction, and schema inference steps into helper functions for readability and maintainability.


369-373: Doc comment is out of sync with return type.
The comment says "Returns None" but the function returns a HashSet<&str> unconditionally. Update the comment to reflect actual behavior.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5d87407 and 08fea7b.

📒 Files selected for processing (1)
  • src/event/format/json.rs (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (20)
src/event/format/json.rs (20)

29-37: No issues with the new imports.
The additional imports for OpenTelemetry proto, collections, and synchronization primitives look appropriate.


40-51: All imports from local modules look consistent.
No concerns here—these imports are well-grouped and straightforward.


136-137: Type alias usage is clear.
Setting type Data = Json; aligns well with the rest of the implementation.


144-154: Signature change is logical.
Expanding parameters to include time_partition_limit and custom_partitions looks reasonable. Clear separation of concerns in to_data().


217-223: RecordBatch decoding logic is succinct.
Serializing data into Arrow is straightforward, and error handling looks fine here.


344-345: Timestamp extraction looks sound.
Relying on Serde to parse the field as a DateTime<Utc> is convenient and straightforward.


358-367: Early exit if any key is missing.
Returning None as soon as a schema field isn't found aligns with the code's approach of deferring to inference later. No issues here.


375-381: Field mismatch logic looks correct.
The function properly checks for unrecognized or invalid field types. Good approach for schema consistency.


443-444: Expanded imports for tests are valid.
They provide essential Arrow and array conversions used in subsequent tests.


499-530: Test coverage for basic JSON object ingestion looks good.
This test ensures integer, float, and string fields are properly captured. No issues noted.


531-559: Test with null fields is comprehensive.
Confirms that null fields don’t break ingestion. Great for verifying optional columns.


560-594: Schema derivation test is thorough.
Checks an existing schema merging in new fields. Implementation matches expected results.


595-618: Properly detects schema mismatch.
Ensures type differences raise errors. Good coverage for invalid input scenarios.


620-643: Empty object ingestion test is valid.
Verifies behavior when the JSON has no fields at all. Helps confirm robust schema handling.


645-694: Array ingestion and schema inference test.
Multiple objects with varying fields is a common scenario. This test helps validate partial field presence.


696-739: Array ingestion with null fields.
Verifies that partial null usage across rows is properly interpreted as optional columns.


741-791: Array ingestion with null plus existing stored schema.
Ensures stored schema merges gracefully with newly inferred columns.


793-819: Array schema mismatch test.
Correctly fails when new data conflicts with existing types.


821-897: Nested list ingestion test.
Checks arrays of integers, combined with other fields, verifying complex structure. No issues observed.


899-957: Float64 casting test for schema version V1.
Demonstrates handling numeric fields as floats in older or v1-based schemas. Coverage is solid.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/event/format/json.rs (1)

70-132: 🛠️ Refactor suggestion

Avoid using unreachable!() for data validation.
Although the logic suggests this branch won’t be reached, unexpected inputs in production could trigger a panic. Replace unreachable!() with a more descriptive error, preventing inadvertent panics.

-                _ => unreachable!("flatten would have failed beforehand"),
+                _ => return Err(anyhow!(
+                    "Unexpected JSON type encountered. Please verify inputs."
+                )),
🧹 Nitpick comments (3)
src/event/mod.rs (2)

34-40: Add explanatory documentation for PartitionEvent.
While this struct is straightforward, a short doc comment would help clarify its purpose and usage for future maintainers.


54-80: Consider consolidating repeated actions.
Calling commit_schema, push, update_stats, and LIVETAIL.process in the same loop may be acceptable, but running them repeatedly for multiple partitions could add overhead. Check whether some operations should be performed once outside the loop or batched together.

src/event/format/json.rs (1)

345-351: Broaden error handling or accepted formats.
Currently, only RFC3339 or RFC2822 formats are parsed as timestamps. If more date-time formats appear in production logs, consider expanding or customizing the parsing strategy.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 08fea7b and 9b0d865.

📒 Files selected for processing (3)
  • src/event/format/json.rs (7 hunks)
  • src/event/format/mod.rs (7 hunks)
  • src/event/mod.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (18)
src/event/mod.rs (3)

23-28: No issues with the new imports.
These imports are necessary for the functionality introduced below and seem properly scoped.


42-45: Validate change from u64 to usize.
Using usize could lead to potential inconsistencies across platforms (e.g., 32-bit vs. 64-bit). If the original intent was to store large sizes, confirm that usize remains safe in all target environments.

Also applies to: 48-48


82-96: Check rationale behind omitting stats & live-tail updates.
Unlike process(), this process_unchecked() method doesn’t call update_stats or LIVETAIL.process. Verify that this omission is intended and won’t cause data inconsistencies or missing real-time updates.

src/event/format/json.rs (8)

29-31: No concerns regarding these imports.
They provide necessary definitions for OpenTelemetry protos, standard collections, and other project modules.

Also applies to: 33-37, 40-50


140-154: Logic in to_data is clear and consistent.
Delegating to flatten_logs is a clean approach for abstracting JSON transformations.


156-213: Validate handling of new or unseen fields.
When schema inference fails due to new fields, the function returns an error. Consider whether partial schema updates or ignoring unknown fields might be appropriate in some scenarios.


215-229: Slice-based decode approach looks good.
Taking a slice of data instead of a single element is a more flexible design for creating a RecordBatch.


232-317: Beware potential partition key collisions.
The partition key includes a schema hash, optional time partition, and custom partition fields. For large-scale systems, confirm the combination sufficiently avoids collisions, or else concurrency issues and data overwrites might occur.


356-367: Confirm rejection of new fields is intended.
If “one new field” triggers None, existing data gets discarded. Check if partial schema extension is an option, or if a strict approach is desired.


369-373: Efficient collect_keys implementation.
Using a HashSet here is a clean and direct approach; it improves performance for large sets of keys.


375-382: Sufficient mismatch checks.
fields_mismatch ensures strict consistency with the existing schema, which is suitable for strongly typing logs.

src/event/format/mod.rs (7)

23-23: New import for NonZeroU32 is valid.
This nonzero type helps ensure that partition limits are never zero at runtime.


27-27: anyhow::anyhow import usage is acceptable.
No concerns; using anyhow is consistent across the updated code.


114-120: Schema inference signature extension looks consistent.
Accepting additional parameters like time_partition_limit and custom_partitions is logically aligned with the approach in json.rs.


124-155: Reserved field assertion is beneficial.
Returning an error if p_timestamp is already present as a column prevents conflicts or accidental overwriting of the timestamp field.


157-178: Solid approach in into_recordbatch.
Using update_field_type_in_schema before decoding ensures we apply consistent transformations (timestamp conversions, numeric casts, etc.) before finalizing the RecordBatch.


179-179: Method signature aligns with the rest of the refactor.
Accepting a stream reference fosters consistency with the JSON event logic.


237-319: Dynamic type overrides are well-structured.
Allowing “time” or “date”-named fields to become timestamps, and numbers to become float64 in Schema V1, makes sense for a more flexible ingestion experience.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/event/format/json.rs (1)

78-141: 🛠️ Refactor suggestion

Robust log flattening with source-specific processing

The new flatten_logs method is a well-structured addition that cleanly handles different log formats based on the source type. The implementation properly uses type-specific flattening functions for various log sources.

However, there's a potential issue with the error handling:

Replace the unreachable! statement with a proper error return to avoid potential panics in production:

-            _ => unreachable!("flatten would have failed beforehand"),
+            other => return Err(anyhow!("Expected an object or an array but got: {:?}", other)),
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

237-245: Simplified schema retrieval with better error handling

The refactored get_schemas_if_present method is more concise and readable by directly iterating over schema files and using BufReader for efficient reading. However, there's a potential issue with the error handling in this method.

Consider handling potential errors when opening schema files instead of using expect, which could cause panics in production:

-            let file = File::open(path).expect("Schema File should exist");
+            let file = match File::open(path) {
+                Ok(file) => file,
+                Err(e) => {
+                    warn!("Failed to open schema file {}: {}", path.display(), e);
+                    continue;
+                }
+            };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9b0d865 and d096ce0.

📒 Files selected for processing (9)
  • src/event/format/json.rs (7 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/ingest.rs (9 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/handlers/http/mod.rs (0 hunks)
  • src/lib.rs (1 hunks)
  • src/metadata.rs (1 hunks)
  • src/parseable/mod.rs (1 hunks)
  • src/parseable/streams.rs (12 hunks)
💤 Files with no reviewable changes (1)
  • src/handlers/http/mod.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/lib.rs
  • src/metadata.rs
  • src/handlers/http/logstream.rs
  • src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (12)
src/parseable/streams.rs (2)

574-585: Well-designed schema merging implementation

The new commit_schema method provides a clean and efficient way to merge the incoming schema with the existing one, making the schema evolution process more maintainable. The implementation properly handles the merging and updating of metadata.


112-131: Improved code clarity with meaningful parameter names

Renaming schema_key to prefix enhances code readability by using a more descriptive name that better represents the parameter's purpose. The updated DiskWriter implementation is also more consistent with the pattern used elsewhere in the codebase.

src/event/mod.rs (4)

34-39: Good data structure design for partition events

The new PartitionEvent struct is a well-designed abstraction that cleanly encapsulates the relationship between a RecordBatch and its corresponding date, improving code readability and maintainability.


40-48: Enhanced Event structure for better partition management

The improved Event struct now uses a HashMap to map partition prefixes to PartitionEvent instances, which is a more elegant approach that better encapsulates the relationship between partitions and their data.


52-70: Streamlined event processing with better error handling

The updated process method now properly iterates over partition events, commits the schema only if it's the first event, and efficiently pushes data for each partition. This approach is more maintainable and aligns well with the new data structure design.


72-78: Consistent pattern for unchecked processing

The process_unchecked method follows the same pattern as the process method, maintaining consistency throughout the codebase. The method is concise and focused on its specific responsibility.

src/handlers/http/ingest.rs (3)

44-80: Simplified log ingestion with direct push_logs call

This refactoring significantly improves the ingest endpoint by:

  1. Using a direct method call to push_logs instead of complex flattening logic
  2. Centralizing log processing logic
  3. Enhancing code maintainability by reducing complexity at the HTTP handler level

The new approach provides a cleaner separation of concerns between HTTP handling and log processing.


236-260: Updated function signature for push_logs_unchecked

Accepting a &Stream parameter emphasizes the new ingestion pattern. Ensure concurrency safety whenever multiple threads attempt "unchecked" pushes simultaneously.

In highly concurrent deployments, consider locking or queueing events to avoid partial writes or race conditions in process_unchecked.

#!/bin/bash
# Look for any synchronization mechanisms around process_unchecked calls
rg -A 5 -B 5 "process_unchecked" --type rust

244-254:

❓ Verification inconclusive

Consider using more type-safe initialization for partitions

The initialization of PartitionEvent with hardcoded values for is_first_event (always true) seems potentially risky. This might lead to schema changes being committed unnecessarily if multiple threads are processing events concurrently.


🏁 Script executed:

#!/bin/bash
# Check if is_first_event is used for controlling schema commits in other places
rg "is_first_event.*commit_schema" --type rust

Length of output: 48


Action Required: Verify the Initialization of is_first_event in PartitionEvent

The current initialization hardcodes is_first_event as true, which might lead to unintended schema commits if multiple threads process events concurrently. Note that our search for usages linking is_first_event to schema commit operations (e.g. using patterns like commit_schema) did not return any results. However, this lack of evidence means we should manually verify if this field controls any critical behavior elsewhere in the codebase.

  • Review: Confirm whether is_first_event is intended to remain hardcoded or if a more robust, type-safe initialization is required.
  • Action: Manually verify (or add additional tests) that the current setup does not inadvertently trigger schema changes in concurrent scenarios.
src/event/format/json.rs (3)

149-162: Simplified interface for data conversion

The refactored to_data method now has a clearer contract, delegating the implementation details to the flatten_logs method. This separation of concerns enhances maintainability and readability.


394-397: Simplified key collection implementation

The collect_keys function has been simplified to directly collect keys into a HashSet, which is more concise and efficient than the previous implementation.


239-333:

❓ Verification inconclusive

Well-designed partition-aware event creation

The updated into_event method now properly handles partitioning based on schema keys, timestamps, and custom partition values. The implementation is thorough and correctly organizes data into appropriate partitions with their corresponding dates.

However, there's a potential issue with the prefix generation logic:


🏁 Script executed:

#!/bin/bash
# Check consistency of prefix generation across the codebase
rg "prefix = format!" --type rust

Length of output: 474


Prefix Generation Consistency — Please Verify Alignment

The updated into_event method is well structured and partition-aware, correctly organizing events by schema keys, timestamps, and custom partition values. The composite prefix generated in this method (using schema key, a formatted timestamp, and custom partition values) follows a clear intent.

However, a review of similar expressions across the codebase shows that:

  • src/event/format/json.rs builds a composite prefix via concatenation with dots.
  • src/utils/time.rs and src/parseable/streams.rs use format! with slightly different delimiters and structure (such as using folder separators like /).

Please verify that these differences in prefix formatting are intentional. If the prefixes are meant to follow the same structural conventions across modules, consider aligning the formatting logic to prevent potential partitioning mismatches.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/event/format/json.rs (1)

239-338: Improved event partitioning with clearer structure

The refactored into_event method now uses the Stream object to access all required parameters, which is a cleaner approach. The partitioning logic using JsonPartition is well-structured and makes the code more maintainable.

One potential improvement is to consider using a builder pattern for constructing the partitions, which could make the code even more readable, especially as the partitioning logic grows in complexity.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d096ce0 and e15b0d2.

📒 Files selected for processing (1)
  • src/event/format/json.rs (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (22)
src/event/format/json.rs (22)

26-31: Enhanced imports for expanded functionality

The addition of Chrono date/time types and OpenTelemetry protobuf model imports properly supports the new log processing capabilities across multiple log sources.


39-53: Well-structured module organization

The imports are properly organized into logical groups, making dependencies clear. The codebase now directly uses important components like Stream and various flattening utilities for different log formats.


55-59: Appropriate addition of JsonPartition struct

The new JsonPartition struct effectively encapsulates related data (batch, schema, date) that was previously passed as separate parameters, improving code organization.


61-66: Enhanced Event struct with better source tracking

Good addition of log_source field to track the origin type of events, which supports the source-specific processing logic introduced in this PR.


69-76: Constructor updated to match new struct definition

The constructor properly initializes the new fields, maintaining backward compatibility with the existing API while supporting the new functionality.


78-141: Handle potential panic in unreachable code path

The flatten_logs function has a well-designed structure for handling different log sources. However, line 136 contains an unreachable! statement that could potentially panic in production if unexpected input is received.

Consider replacing the unreachable statement with a proper error handling:

-            _ => unreachable!("flatten would have failed beforehand"),
+            other => return Err(anyhow!("Expected an object or an array but got: {:?}", other)),

144-162: Clean simplification of to_data method

The method has been effectively simplified by delegating to the flatten_logs method, reducing code duplication and making the code more maintainable.


223-237: More efficient batch processing in decode method

The change to accept a slice of data rather than a single item allows for more efficient batch processing of events. This is a good optimization, especially for high-volume log ingestion.


392-394: Simplified key collection with HashSet

The collect_keys function now directly collects keys into a HashSet, which is appropriate for this use case, as it automatically handles duplicate keys and provides faster lookups.


397-403: Cleaner field mismatch detection

The updated fields_mismatch function is more concise and readable, using a cleaner approach to check for field validity.


495-513: Useful test helpers for type checking

The TestExt trait is a good addition that makes test assertions more readable and type-safe. This approach is much better than using raw type casting in the test code.


515-517: Utility function for test setup

The fields_to_map helper function effectively reduces repetition in the tests, making them more concise and easier to maintain.


519-550: Comprehensive test for basic object conversion

This test thoroughly verifies the conversion of a simple JSON object to a RecordBatch, correctly testing both schema inference and data conversion aspects.


552-579: Good testing of null value handling

The test properly verifies that null values are handled correctly in the conversion process, which is important for ensuring data integrity with real-world inputs.


581-614: Effective testing of schema derivation

This test properly verifies the schema derivation logic when a stored schema already exists, which is critical for ensuring schema evolution works correctly.


616-639: Important test for schema type mismatches

This test checks that the code correctly rejects inputs with types that don't match the schema, which is essential for ensuring data consistency.


641-664: Edge case testing for empty objects

Good test for handling empty JSON objects, which is an important edge case that could occur in production.


666-715: Thorough testing of array processing

This test effectively verifies that arrays of JSON objects are correctly processed into a RecordBatch with an inferred schema, including proper handling of missing fields.


717-812: Comprehensive null value handling in arrays

These tests thoroughly verify that null values in arrays of JSON objects are handled correctly, both with and without a pre-existing schema. This is critical for robustness with varied inputs.


814-918: Good testing of nested data structures

The test thoroughly verifies handling of nested data structures (arrays within objects), which is important for complex log formats.


920-978: Verification of schema version compatibilities

This test effectively verifies that the code handles different schema versions correctly, which is critical for backward compatibility as the system evolves.


164-221:

❓ Verification inconclusive

Simplification of schema inference logic

The code now better handles the case when no schema can be derived, with clearer error messaging and a more direct flow. The change from checking for None to using if let Some(schema) improves readability.

However, there's still a potential issue with collect_keys not handling empty sets explicitly. Consider adding a check to ensure the fields set isn't empty before proceeding.


🏁 Script executed:

#!/bin/bash
# Check how empty key sets are handled elsewhere in the codebase
rg "collect_keys.*empty|empty.*collect_keys" --type rust

Length of output: 58


Schema Inference Logic Improvement – Please Verify Empty Set Handling

The updated schema inference logic is clearer and easier to follow. The refactoring using if let Some(schema) has indeed improved readability and error messaging. However, the review comment’s concern regarding potential issues with collect_keys not explicitly handling empty sets remains unresolved. Our initial search for explicit handling (using rg "collect_keys.*empty|empty.*collect_keys") returned no results, so it’s unclear whether an empty fields set is adequately addressed elsewhere in the codebase.

  • Action Required: Please verify manually that when collect_keys returns an empty set, the downstream operations (such as schema derivation/merging) behave as intended. If the empty set could lead to runtime issues, consider adding an explicit check (and corresponding error handling or fallback behavior) before proceeding.

Signed-off-by: Devdutt Shenoi <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/handlers/http/cluster/utils.rs (2)

216-219: Add documentation for the JsonWithSize struct.

This is a well-designed struct that encapsulates JSON data with its size. Consider adding documentation comments to explain its purpose and usage, especially since it's a public struct that might be used by other developers.

+/// A wrapper struct that holds deserialized JSON data along with its original byte size.
+/// 
+/// This is useful for tracking payload sizes during request processing and can be used
+/// for monitoring and enforcing size limits.
 pub struct JsonWithSize<T> {
+    /// The deserialized JSON data
     pub json: T,
+    /// Size of the original JSON payload in bytes
     pub byte_size: usize,
 }

221-260: Well-implemented FromRequest trait with proper size checking.

The implementation correctly handles streaming JSON payloads, tracks their size, and enforces the size limit. The previously identified Clippy issues have been addressed properly.

Some suggestions for improvement:

  • Add documentation comments for the implementation
  • Consider adding debug logging for rejected oversized payloads
+/// Implements FromRequest for JsonWithSize<T> to allow automatic extraction and size tracking
+/// of JSON payloads from HTTP requests.
+/// 
+/// This implementation enforces the MAX_EVENT_PAYLOAD_SIZE limit and tracks the byte size
+/// of the original payload.
 impl<T: DeserializeOwned + 'static> FromRequest for JsonWithSize<T> {
     type Error = actix_web::error::Error;
     type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;

     fn from_request(_: &HttpRequest, payload: &mut Payload) -> Self::Future {
         let limit = MAX_EVENT_PAYLOAD_SIZE;

         // Take ownership of payload for async processing
         let mut payload = payload.take();

         Box::pin(async move {
             // Buffer to collect all bytes
             let mut body = BytesMut::new();
             let mut byte_size = 0;

             // Collect all bytes from the payload stream
             while let Some(chunk) = payload.next().await {
                 let chunk = chunk?;
                 byte_size += chunk.len();

                 // Check the size limit
                 if byte_size > limit {
+                    // Log rejection of oversized payload
+                    tracing::debug!("Rejected payload exceeding size limit: {}/{} bytes", byte_size, limit);
                     return Err(ErrorPayloadTooLarge(byte_size));
                 }

                 // Extend our buffer with the chunk
                 body.extend_from_slice(&chunk);
             }

             // Convert the collected bytes to Bytes
             let bytes = body.freeze();

             // Deserialize the JSON payload
             let json =
                 serde_json::from_slice::<T>(&bytes).map_err(JsonPayloadError::Deserialize)?;

+            tracing::trace!("Successfully processed JSON payload: {} bytes", byte_size);
             Ok(JsonWithSize { json, byte_size })
         })
     }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e15b0d2 and b106106.

📒 Files selected for processing (5)
  • src/handlers/http/cluster/mod.rs (3 hunks)
  • src/handlers/http/cluster/utils.rs (2 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/handlers/http/query.rs (1 hunks)
  • src/parseable/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/handlers/http/logstream.rs
  • src/parseable/mod.rs
  • src/handlers/http/cluster/mod.rs
  • src/handlers/http/query.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (1)
src/handlers/http/cluster/utils.rs (1)

19-36: Good choice of imports for the new functionality.

The added imports support the new JSON payload handling functionality and are appropriately scoped.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
src/event/format/json.rs (1)

344-361: ⚠️ Potential issue

Risk of panic with .unwrap() in extract_custom_partition_values.
Using json.get(custom_partition_field.trim()).unwrap() can panic if the field is missing. To prevent runtime crashes, consider returning an error or skipping missing fields.

Here’s a possible fix:

- let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+ let custom_partition_value = match json.get(custom_partition_field.trim()) {
+     Some(value) => value.to_owned(),
+     None => {
+         // Return error or skip missing value
+         return Err(anyhow!("Missing custom partition field: {}", custom_partition_field));
+     }
+ };
🧹 Nitpick comments (5)
src/event/format/json.rs (5)

55-60: Consider adding a doc comment to JsonPartition.
A brief doc comment about the purpose and usage of JsonPartition would aid readability and maintainability.


63-66: Check cross-platform portability for origin_size.
Using usize might lead to cross-platform differences in maximum size. Consider using u64 instead to avoid potential type casting issues on 32-bit systems.


164-221: Potential performance bottleneck in infer_schema.
Repeatedly inferring and merging schemas on large data volumes can be expensive. Consider caching or consolidating these merge operations to reduce overhead.


240-339: Evaluate partition prefix collisions.
When building the partition prefix with dynamic custom partition values, consider how collisions or naming conflicts might be handled. This may require adding user warnings or fallback logic.


464-979: Comprehensive test coverage.
These tests effectively cover arrays, null values, nested fields, and schema derivation. Consider adding a negative test for the unreachable!() branch within flatten_logs to ensure robust error handling for malformed inputs.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b106106 and b8606b3.

📒 Files selected for processing (2)
  • src/event/format/json.rs (7 hunks)
  • src/handlers/http/cluster/utils.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (13)
src/handlers/http/cluster/utils.rs (3)

19-34: Well-structured import additions for new functionality

The new imports are appropriately organized and provide the necessary components for handling request payloads, futures, byte manipulation, and JSON deserialization - all essential for the new JsonWithSize implementation.


215-218: Good design of the JsonWithSize<T> struct

This generic struct effectively pairs the deserialized JSON data with its byte size, which is a clean approach for tracking resource usage during event ingestion.


220-259: Well-implemented FromRequest trait for JSON payload handling

The implementation of FromRequest for JsonWithSize<T> is robust, featuring:

  • Proper streaming of request payload to handle large data efficiently
  • Size limit enforcement to prevent resource exhaustion
  • Clear error handling for size violations and deserialization issues
  • Effective use of async/await patterns

I notice the implementation correctly addresses the previous Clippy warnings by:

  1. Removing the redundant .into() conversion at line 242
  2. Using direct function reference rather than closure for the error mapping at line 254
src/event/format/json.rs (10)

26-54: Use statements verified.
No issues found with these new or updated imports; they appear logically consistent and relevant to the newly introduced functionalities.


69-76: Initialization approach looks good.
The Event::new function is straightforward and aligns with standard Rust best practices.


78-135: flatten_logs function structure is clear.
The function covers multiple log sources and consistently flattens them using specialized helpers. Error handling for unexpected data is mostly robust.


136-136: Relying on unreachable!() can cause panics in production.
This pattern was flagged in a past review. Consider returning a user-friendly error instead of panicking if a truly unexpected value is encountered.


144-163: Clean extension of to_data with new arguments.
The refactoring to accept time_partition_limit and custom_partitions is logical and helps keep the flattening logic self-contained.


224-237: Arrow decoding logic looks fine.
Using the ReaderBuilder with serialize and flush is a standard approach. No apparent issues.


365-372: Time extraction logic is appropriate.
Using serde_json::from_value to parse RFC3339 strings into DateTime<Utc> is a robust approach; error cases are handled properly.


377-388: Schema derivation is straightforward.
Returning None if a field is missing is a simple fallback that the caller handles gracefully. No significant issues here.


390-394: collect_keys improvement noted.
This no longer uses expect, addressing previous concerns about potential panics.


396-403: No concerns in fields_mismatch.
The check for missing or invalid field types appears clear and robust.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
src/parseable/staging/writer.rs (2)

41-44: Consider making struct fields private for better encapsulation.

While the DiskWriter struct itself is a good addition, consider making the fields private and providing accessor methods if needed to follow Rust's information hiding principles.

 pub struct DiskWriter {
-    pub inner: StreamWriter<BufWriter<File>>,
-    pub path: PathBuf,
+    inner: StreamWriter<BufWriter<File>>,
+    path: PathBuf,
 }

47-53: Good implementation of the constructor method.

The new method appropriately sets up the file with OpenOptions and creates a buffered writer. Consider specifying a buffer size for potential performance optimization.

 pub fn new(path: PathBuf, schema: &Schema) -> Result<Self, StagingError> {
     let file = OpenOptions::new().create(true).append(true).open(&path)?;
-
-    let inner = StreamWriter::try_new_buffered(file, schema)?;
+    // Use a specified buffer size for potential I/O performance optimization
+    let buf_writer = BufWriter::with_capacity(64 * 1024, file); // 64KB buffer
+    let inner = StreamWriter::try_new(buf_writer, schema)?;

     Ok(Self { inner, path })
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b8606b3 and fbd3a18.

📒 Files selected for processing (1)
  • src/parseable/staging/writer.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (4)
src/parseable/staging/writer.rs (4)

21-21: Good use of imports for the new functionality.

The added imports support the new DiskWriter struct implementation, particularly the file operations and buffered writing capabilities.


29-33: Appropriate error handling and module dependencies.

Good addition of the tracing::error import for logging errors and proper organization of imports for ARROW_FILE_EXTENSION and error handling.


38-39: Good architectural improvement through encapsulation.

Changing from direct StreamWriter<File> to DiskWriter improves encapsulation and separation of concerns, making the code more maintainable.


55-57: Clean delegation to inner writer with proper error mapping.

This method appropriately delegates to the inner stream writer and maps the error to the domain-specific StagingError type.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
src/event/format/json.rs (1)

351-368: ⚠️ Potential issue

Unwrap may trigger panic for missing partition fields.
Line 356 calls .unwrap() on json.get(...). If the field is missing, it will panic. For robust ingestion, consider returning an error or skipping missing fields.

- let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned();
+ let custom_partition_value = match json.get(custom_partition_field.trim()) {
+     Some(val) => val.to_owned(),
+     None => return Err(anyhow!(
+         "Missing custom partition field: {}",
+         custom_partition_field
+     )),
+ };
♻️ Duplicate comments (1)
src/event/format/json.rs (1)

109-137: ⚠️ Potential issue

Avoid panic on unexpected input.
Using _ => unreachable!("flatten would have failed beforehand") can cause a runtime panic if the code ever encounters an unexpected JSON type. Consider returning a structured error to handle malformed data gracefully.

- _ => unreachable!("flatten would have failed beforehand"),
+ other => return Err(anyhow!("Expected object or array, found: {:?}", other)),
🧹 Nitpick comments (3)
src/event/format/json.rs (3)

164-205: Schema inference logic is improved, but there's a TODO.
Your logic for merging inferred schema with stored schema is sound. Line 179 has a TODO: marker; consider clarifying or removing if no additional plan is needed.

Would you like help completing or removing the TODO?


330-346: Potential string-escaping for partition prefixes.
If users include special characters in partition fields, consider minimal escaping or sanitizing to avoid filesystem or path issues.


382-399: Doc comment mismatch.
The doc for derive_arrow_schema and collect_keys mentions returning None if one value isn't an object, but the implementation differs. Update doc comments to match the current logic.

- // Returns None if even one of the value is not an Object
+ // Collects field names from the JSON object, returning a HashSet.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fbd3a18 and 592d3a1.

📒 Files selected for processing (3)
  • src/event/format/json.rs (6 hunks)
  • src/parseable/staging/writer.rs (2 hunks)
  • src/parseable/streams.rs (12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (23)
src/parseable/streams.rs (5)

113-114: Parameter rename improves clarity

Changing from schema_key to prefix better reflects the role of this parameter, making the code more intuitive.


119-131: DiskWriter implementation enhances resource management

The code now uses the new DiskWriter struct instead of direct StreamWriter operations, which provides better encapsulation and automatic resource cleanup via the Drop trait.


240-245: Simplified schema file reading

Using BufReader for reading schema files is more efficient for large files. The code now directly attempts to read schema files without the initial directory check, which streamlines the implementation.


327-333: Improved flush implementation with explicit resource management

The updated flush method clearly separates memory clearing and disk flushing operations with helpful comments. Using std::mem::take to drop disk writers is an effective way to ensure all resources are properly released.


567-578: New commit_schema method improves schema management

This new method encapsulates the logic for merging a provided schema with the current schema and updating the metadata. The implementation is clean and maintains proper error handling.

src/parseable/staging/writer.rs (6)

45-48: Well-structured DiskWriter struct

The new DiskWriter struct effectively encapsulates the file writing operations with clear field definitions.


51-57: Clean implementation of the new method

The constructor properly initializes the writer with appropriate file options and error handling.


59-61: Concise write method with proper error mapping

The write method correctly maps arrow errors to your application's StagingError type, maintaining consistent error handling.


63-74: Error handling in finish method could be improved

The finish method logs errors but doesn't propagate them to the caller, which could lead to silent failures.


77-81: Excellent Drop trait implementation

Implementing the Drop trait ensures that resources are properly cleaned up when the DiskWriter goes out of scope, preventing resource leaks.


42-43: Updated Writer struct field improves resource management

Changing the disk field to use DiskWriter instead of StreamWriter directly improves encapsulation and ensures proper resource cleanup.

src/event/format/json.rs (12)

26-37: Imports look fine.
No issues spotted with the additional imports for chrono, opentelemetry_proto, and various standard library collections.


40-53: Module references are consistent with the refactor.
These references to super modules and crate internals align well with the broader event-handling refactor.


55-66: Good introduction of JsonPartition struct.
Storing batched JSON, schema, and date helps organize partitions for ingestion.


69-84: Constructor changes look good.
The additional fields (origin_size and log_source) provide better tracking of upstream metadata.


85-107: Source-based flattening is well-structured.
Deserialization for each log source is straightforward, and error handling is appropriate.


144-146: Data alias and to_data function.
These changes align with the new approach of converting JSON into a vector of flattened logs.

Also applies to: 152-162


212-219: Field mismatch check is straightforward.
Your mismatch error for incompatible data types is clearly reported.


223-223: Decoding improvements.
Increasing batch size capacity and handling the flush result carefully should help performance.

Also applies to: 231-231


240-327: Refined into_event logic.
Partitioning events by prefix, date, and custom partitions is a neat approach to batch record creation. This helps align ingestion with user-defined grouping.


372-379: Time extraction is good.
Graceful error handling on missing or invalid fields ensures robust parsing.


403-465: Validation functions are thorough.
Overall logic checks for a wide range of DataType–JSON matchups, providing robust schema enforcement.


471-1033: Unit tests cover multiple scenarios effectively.
The variety of JSON shapes, null values, type mismatches, and partition logic ensures good confidence in the refactored code.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/event/format/json.rs (1)

78-141: 💡 Verification agent

❓ Verification inconclusive

Robust handling for different log sources with a potential edge case

The new flatten_logs method nicely handles different log sources with specific processing for each type. This improves maintainability by centralizing all flattening logic in one place.

However, line 136 uses unreachable!("flatten would have failed beforehand") which could potentially panic in production if an unexpected JSON structure is encountered that passes the earlier checks.

-            _ => unreachable!("flatten would have failed beforehand"),
+            _ => return Err(anyhow!("Unexpected JSON value type: expected object or array")),

This ensures graceful error handling even for unexpected edge cases.


Action Required: Replace unreachable! with Explicit Error Handling

The new flatten_logs method centralizes the flattening logic well. However, using unreachable!("flatten would have failed beforehand") on line 136 risks a production panic if an unexpected JSON value slips through the earlier validations. Instead, return an error to gracefully handle such cases.

Recommended change:

-            _ => unreachable!("flatten would have failed beforehand"),
+            _ => return Err(anyhow!("Unexpected JSON value type: expected object or array")),

This change ensures that even in unforeseen scenarios, the function returns an error rather than panicking.

🧹 Nitpick comments (1)
src/event/format/json.rs (1)

164-173: Enhanced schema inference with potential empty fields concern

The infer_schema method is well-structured, but there's a potential issue: the call to collect_keys at line 172 doesn't check if the returned set might be empty (e.g., if processing an empty object).

Consider adding validation for empty field sets:

-        let fields = collect_keys(data);
+        let fields = collect_keys(data);
+        if fields.is_empty() {
+            // Either log a warning or handle this case specifically
+            // Example: return Err(anyhow!("No fields found in the data"))
+        }

Validating this will help prevent potential issues downstream when an empty schema is used.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6bb7afc and 924ef5b.

📒 Files selected for processing (8)
  • src/connectors/kafka/processor.rs (4 hunks)
  • src/event/format/json.rs (6 hunks)
  • src/event/format/mod.rs (8 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/ingest.rs (9 hunks)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/metadata.rs (1 hunks)
  • src/parseable/streams.rs (12 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/metadata.rs
  • src/handlers/http/logstream.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (33)
src/parseable/streams.rs (4)

575-586: Good implementation of schema evolution with commit_schema

This new method provides a clean, dedicated way to merge schemas and update metadata. The implementation correctly handles schema evolution by merging the current schema with the new one and updating the metadata accordingly.

The use of try_merge for schema evolution is appropriate, as it preserves field compatibility while allowing for schema evolution.


118-119: Improved clarity with renamed parameter and updated DiskWriter implementation

Changing schema_key to prefix improves code understandability, as it better reflects the parameter's purpose. The updated implementation using DiskWriter instead of StreamWriter streamlines the record writing process.

The match pattern with improved error handling and file creation flow is now more robust and easier to follow.

Also applies to: 125-141


248-254: Simplified schema retrieval logic

The refactored get_schemas_if_present method removes redundant checks and streamlines the file reading process with BufReader, which improves performance when reading schema files.


150-156: Good extraction of filename generation into dedicated method

The extraction of filename generation logic into filename_from_prefix promotes code reusability and improves maintainability.

src/connectors/kafka/processor.rs (2)

41-77: Streamlined Kafka record processing with direct stream ingestion

The refactored method process_event_from_chunk improves the processing flow by:

  1. Directly pushing logs to the stream instead of creating an intermediate event object
  2. Returning the payload size, which is more meaningful for logging/tracking
  3. Simplifying the code path with fewer transformations

This change aligns with the overall event handling refactor and reduces complexity.


86-88: Updated method call with improved logging

The call to the renamed method correctly references the new return type, logging the processed size instead of just the record count, which provides more meaningful metrics.

src/event/mod.rs (4)

35-39: Good encapsulation with new PartitionEvent struct

The new PartitionEvent struct effectively encapsulates the record batch and its parsed timestamp, improving the organization of event data.


41-47: Improved event structure with partition-based approach

Replacing individual fields with a partition-based HashMap improves the Event struct by:

  1. Better organizing related data (record batches and timestamps)
  2. Supporting multi-partition events more naturally
  3. Simplifying iteration over partitions during processing

The change to usize for origin_size is also more appropriate for memory sizing.


51-75: Enhanced event processing with stream-specific schema commitment

The refactored process method now:

  1. Iterates cleanly over partitions
  2. Commits the schema to the stream when needed (first event)
  3. Continues to update statistics and livetail functionality

Passing the stream as a parameter provides better encapsulation and clearer dependencies.


78-87: Consistent updates to process_unchecked method

The changes to process_unchecked appropriately mirror the partition-based approach of the main process method, maintaining consistency in the API.

src/handlers/http/ingest.rs (6)

49-52: Simplified log ingestion with direct push_logs call

The refactored ingest function simplifies the ingestion flow by:

  1. Using JsonWithSize to capture both the JSON payload and its size upfront
  2. Obtaining the stream directly with get_or_create_stream
  3. Creating and processing events in a clear, direct manner

This approach provides better separation of concerns between HTTP handling and log processing logic.

Also applies to: 85-90


97-100: Consistent pattern adoption across OTEL ingestion endpoints

The OTEL logs ingestion endpoint now follows the same pattern as the main ingestion endpoint, creating a more consistent codebase.

Also applies to: 130-135


142-145: Consistent pattern for OTEL metrics ingestion

The OTEL metrics ingestion endpoint follows the same simplified pattern, maintaining consistency across all ingestion endpoints.

Also applies to: 172-177


184-187: Consistent pattern for OTEL traces ingestion

The OTEL traces ingestion endpoint also adopts the simplified pattern, completing the consistent implementation across all ingestion endpoints.

Also applies to: 216-221


228-232: Direct event processing in post_event endpoint

The post_event endpoint now follows the same simplified pattern as other ingestion endpoints, improving consistency and maintainability.

Also applies to: 269-274


279-299: Updated function signature for push_logs_unchecked

The function now accepts a &Stream parameter instead of a stream name, emphasizing the new ingestion pattern. The implementation correctly uses the new PartitionEvent structure to match the refactored event handling system.

In highly concurrent deployments, consider locking or queueing events to avoid partial writes or race conditions in process_unchecked.

src/event/format/json.rs (9)

149-162: Well-designed method delegation to improve modularity

The refactored to_data method now delegates to the new flatten_logs method, which nicely separates concerns and allows for better code organization. This approach makes the code more maintainable by avoiding duplication and keeping related functionality together.


224-237: Improved batch processing for record creation

The decode method now accepts a slice of data objects rather than a single item, allowing for more efficient batch processing. This change aligns well with Arrow's batch-oriented processing model and should improve performance when handling multiple records.


240-323: Well-structured event partitioning with clear logic

The refactored into_event method now effectively:

  1. Extracts configuration from the stream
  2. Processes each JSON object in the data
  3. Groups records by partition prefix
  4. Creates record batches for each partition

This approach handles potentially large record sets efficiently by organizing them into logical partitions. The structure is clean and maintainable, with good separation of concerns between data retrieval, schema inference, and partition generation.


335-351: Clean prefix generation logic

The new generate_prefix function provides a clear, reusable way to create storage prefixes that incorporate schema, timestamp, and custom partition information. This improves maintainability by centralizing the prefix format in one place.


403-406: Simplified key collection with improved API

The collect_keys function has been simplified to directly return a HashSet<&str> instead of a more complex Result type. This makes the code more concise and easier to use.


408-420: Cleaner schema validation approach

The fields_mismatch function has been refactored to use a more functional approach with any(). This makes the validation logic more concise and easier to understand.


566-584: Useful test utilities for Arrow arrays

The TestExt trait provides convenient methods for working with Arrow arrays in tests, making assertions more readable and concise. This is a good practice for test code maintainability.


586-589: Helpful schema utility for tests

The fields_to_map function simplifies creating a HashMap from Field objects, which is useful for test setup. This improves test readability and reduces duplication.


591-1096: Comprehensive test suite covering core functionality

The extensive test suite thoroughly covers different aspects of the refactored code:

  • Basic object processing
  • Schema derivation and validation
  • Array handling
  • Handling null values
  • Nested type handling
  • Schema mismatch detection
  • Partition prefix generation

This comprehensive testing is crucial for ensuring the refactored code works as expected across various scenarios.

src/event/format/mod.rs (8)

123-129: Streamlined method signature with clear parameters

The to_data method signature has been improved to:

  1. Take more specific parameters (time_partition_limit and custom_partitions)
  2. Return a cleaner result type (anyhow::Result<Vec<Self::Data>>)

This makes the API more focused and the error handling more consistent.


131-138: Good separation of concerns with schema inference

Adding the dedicated infer_schema method separates the schema inference logic from data conversion. This improves maintainability by providing a clear place for schema validation and derivation logic.


139-139: Enhanced batch processing capability

The decode method now accepts a slice of data items rather than a single item, enabling more efficient batch processing. This aligns with Arrow's batch-oriented processing model.


142-172: Simplified schema validation with modern Rust patterns

The refactored prepare_and_validate_schema method uses the is_none_or pattern for cleaner validation logic. This makes the code more concise while maintaining readability.

The explicit Ok(schema) return also makes the success path clearer.


174-194: Well-encapsulated record batch creation

The new into_recordbatch method cleanly encapsulates the process of:

  1. Creating a schema from fields
  2. Updating field types
  3. Decoding data into a record batch
  4. Setting the timestamp column

This makes the record batch creation logic more maintainable and reusable.


196-196: Simplified stream-based event creation

The into_event method signature has been simplified to take a stream reference, which is a cleaner approach that gives the implementation access to all the configuration it needs from a central place.


254-254: Type consistency with JSON handling

Changing the log_records parameter type from Option<&Vec<Value>> to Option<&Json> maintains consistency with the JSON handling approach used throughout the codebase.


297-297: Consistent parameter typing

The log_record parameter type in override_data_type has been changed to use the Json type alias consistently with other parts of the codebase. This improves type consistency across the application.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/event/format/mod.rs (1)

296-299: Improved type safety with custom JSON type

Changed log_record parameter from Value to Json and updated the matching logic to use this type. This change:

  1. Provides better type safety and specialized methods for JSON processing
  2. Makes the code more maintainable by using a custom type that can evolve with application needs

Consider documenting the advantages of the Json type over Value in comments to help future developers understand the rationale.

Also applies to: 306-306

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 924ef5b and 3d8c338.

📒 Files selected for processing (1)
  • src/event/format/mod.rs (9 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (9)
src/event/format/mod.rs (9)

23-23: Updated imports to support new functionality

The imports have been updated to support the new event handling approach:

  • Added NonZeroU32 for time partition limits with validation
  • Switched to anyhow for more ergonomic error handling
  • Added utility imports for JSON handling and Arrow operations

These changes align well with the refactoring objectives to improve the event handling system.

Also applies to: 27-27, 36-40


99-100: Improved code semantics with type alias

The IsFirstEvent type alias makes the code more self-documenting and readable by clearly indicating the meaning of the boolean value in function signatures.


123-129: Improved signature for to_data method

The signature changes in to_data method indicate a significant refactoring:

  1. Added support for partition limits and custom partitions
  2. Changed return type to return multiple data items (Vec<Self::Data>)
  3. Switched to anyhow::Result for better error handling

This change appears to be part of an effort to improve the handling of large datasets by allowing batch processing and customized partitioning.


131-138: Added schema inference as a separate concern

The new infer_schema method is a good example of separation of concerns - extracting schema inference logic from data processing. This will make the code more maintainable and easier to test.

This approach allows for more flexible schema handling and will likely improve the performance of schema validation operations.


139-139: Updated decode method for batch processing

The decode method now accepts a slice of data items rather than a single item, enabling batch processing of events. This change should improve performance for high-volume log ingestion scenarios.


141-172: Simplified schema validation with inline logic

The previous separate is_schema_matching method has been replaced with inline schema validation logic that:

  1. Is more concise and easier to understand
  2. Directly expresses the validation condition
  3. Uses modern Rust pattern with is_none_or for cleaner code

This change improves readability while maintaining the same functionality.


174-195: Restructured into_recordbatch for clarity and flexibility

The into_recordbatch method has been completely restructured to:

  1. Take explicit parameters rather than using self
  2. Accept preprocessed data and schema
  3. Focus solely on converting data to a RecordBatch

This change follows good design principles by making the function more modular and single-purpose, which should make the code easier to maintain and test.


197-197: Added into_event method for type conversion

The new into_event method completes the event processing pipeline by providing a way to convert from the format-specific data type to the general Event type.

This addition makes the trait more comprehensive and ensures a complete processing flow from raw data to structured events.


251-256: Updated JSON handling with custom type

The parameter type change from Option<&Vec<Value>> to Option<&Json> and the corresponding implementation changes suggest a transition to a more specialized JSON handling approach.

This change likely provides better performance and more specific functionality for JSON processing, which is important for a log processing system.

Also applies to: 266-269

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant