Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom fields to events #1228

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Mar 7, 2025

p_user_agent - fetch user_agent from request header
p_src_ip - fetch source ip from connection info from request header

user can add additional headers to the ingest apis in the below format x-p-<key-name>: <value>
e.g. x-p-environment:dev

server adds environment in the events with the value dev
user can add multiple custom headers to be inserted as separate fields in the event

Summary by CodeRabbit

  • New Features

    • Extended event processing and log ingestion to support customizable fields.
    • Now extracts additional header information, including user agent and source IP, for enhanced logging.
  • Refactor

    • Streamlined schema handling by integrating custom fields directly into records.
    • Improved header access by standardizing key references for better consistency.

Copy link

coderabbitai bot commented Mar 7, 2025

Walkthrough

This pull request extends the event processing functionality by introducing a new parameter for custom fields across various modules. The changes update the method signatures for event formatting, ingestion, and log processing functions to accept an additional HashMap of custom fields. Furthermore, constants for user agent and source IP tracking are added, and helper functions are introduced to extract these fields from HTTP requests and integrate them into record batches.

Changes

File(s) Change Summary
src/event/format/json.rs, src/event/format/mod.rs Added new parameter p_custom_fields to the into_event and into_recordbatch methods of the EventFormat trait. Removed the manual timestamp field insertion in favor of utilizing add_parseable_fields for schema creation.
src/event/mod.rs Introduced new constant strings: USER_AGENT_KEY and SOURCE_IP_KEY for identifying user agent and source IP information.
src/handlers/http/audit.rs, src/handlers/http/ingest.rs Updated the audit middleware to use the USER_AGENT constant instead of a literal string. Modified several ingestion endpoints to support custom fields extraction and updated their return types to Result<HttpResponse, PostError>.
src/handlers/http/modal/utils/ingest_utils.rs Added the function get_custom_fields_from_header to extract custom fields from the HTTP request headers. Updated the function signatures of flatten_and_push_logs and push_logs to include the new custom fields parameter.
src/utils/arrow/mod.rs Added the public function add_parseable_fields to modify a record batch by prepending a timestamp field and custom fields into the schema. Updated necessary imports to support the creation of string arrays and fields for the custom data.

Sequence Diagram(s)

Loading
sequenceDiagram
    participant Client
    participant IngestHandler as Ingest Handler
    participant Utils as Ingest Utils
    participant Event as EventFormat
    participant Arrow as Arrow Utils

    Client->>IngestHandler: HTTP Request (with headers)
    IngestHandler->>Utils: get_custom_fields_from_header(req)
    Utils-->>IngestHandler: custom fields
    IngestHandler->>Event: call into_event(..., custom_fields)
    Event->>Event: call into_recordbatch(..., custom_fields)
    Event->>Arrow: call add_parseable_fields(rb, timestamp, custom_fields)
    Arrow-->>Event: modified RecordBatch
    Event-->>IngestHandler: processed event
    IngestHandler-->>Client: HttpResponse

Possibly related PRs

Suggested reviewers

  • de-sh

Poem

I hopped through lines of code today,
Adding custom fields along the way.
Timestamps and headers now dance in line,
Each log and record perfectly align.
With bytes and hops, my day is bright 🐰,
Celebrating changes that feel just right!
For every sprint, I leap with delight.

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Sorry, something went wrong.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/utils/arrow/mod.rs (1)

144-183: Well-implemented function for adding custom fields to record batches

This function is well-structured and handles the addition of custom fields properly:

  • Correctly preserves the original record batch data
  • Inserts timestamp at the beginning
  • Sorts custom fields by key for consistent ordering
  • Handles field type definitions appropriately

One minor optimization suggestion would be to pre-allocate the vectors with the correct capacity.

You could optimize vector allocations by pre-calculating the final size:

-    let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
-    let mut columns: Vec<ArrayRef> = rb.columns().to_vec();
+    let final_size = schema.fields().len() + 1 + p_custom_fields.len();
+    let mut fields: Vec<Field> = Vec::with_capacity(final_size);
+    let mut columns: Vec<ArrayRef> = Vec::with_capacity(final_size);
+    
+    // Add existing fields and columns
+    fields.extend(schema.fields().iter().map(|f| f.as_ref().clone()));
+    columns.extend(rb.columns().to_vec());
src/handlers/http/modal/utils/ingest_utils.rs (1)

163-168: Consider adding validation for custom header keys.

While the implementation correctly extracts headers with the x-p- prefix, consider adding validation to ensure the trimmed keys follow any specific naming restrictions your schema might have.

 if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
     if let Ok(value) = header_value.to_str() {
         let key = header_name.trim_start_matches("x-p-");
+        // Validate key format - alphanumeric and underscore only
+        if key.chars().all(|c| c.is_alphanumeric() || c == '_') {
             p_custom_fields.insert(key.to_string(), value.to_string());
+        }
     }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 10aef7b and bc70091.

📒 Files selected for processing (7)
  • src/event/format/json.rs (2 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/audit.rs (2 hunks)
  • src/handlers/http/ingest.rs (19 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (22)
src/event/mod.rs (1)

38-39: New constants for custom field keys

The addition of these constants establishes standardized keys for user agent and source IP tracking in event data. This enhances code consistency across the application.

src/handlers/http/audit.rs (2)

26-26: Improved header import for HTTP standards

Adding the import for the standard HTTP header constant improves code maintainability.


89-89: Using standard HTTP header constant instead of string literal

Good change - replacing the string literal with the standard HTTP header constant improves code maintainability and reduces the risk of typos.

src/utils/arrow/mod.rs (1)

43-61: Updated imports to support custom fields functionality

The import changes properly support the new functionality for handling custom fields in record batches.

src/event/format/json.rs (2)

151-151: Added parameter for custom fields

The interface has been extended to support custom fields in events, which aligns with the PR objective.


171-171: Passing custom fields to record batch creation

Successfully integrated the custom fields parameter into the existing workflow.

src/handlers/http/modal/utils/ingest_utils.rs (5)

19-20: Good addition of HashMap import for custom fields functionality.

This import is necessary for the new functionality to handle custom fields.


31-31: Good usage of constants for special field keys.

Using constants for special field keys like SOURCE_IP_KEY and USER_AGENT_KEY improves code maintainability.


46-46: Good use of constants for ignored headers.

Defining which headers to ignore is a good practice to maintain consistency and avoid redundant fields.


52-52: Appropriate signature update to accept custom fields.

The function signature has been correctly updated to accept custom fields as a parameter.


146-171: Well-implemented custom fields extraction function.

The implementation is robust, handling:

  1. User agent extraction with fallback to default
  2. Source IP retrieval with default handling
  3. Custom header extraction with proper prefix filtering

It properly follows the PR's requirement of handling x-p-<key-name> headers.

src/handlers/http/ingest.rs (8)

41-41: Good import update to include the new custom fields function.

The imports have been correctly updated to include the new get_custom_fields_from_header function.


75-77: Correctly implemented custom fields extraction in the ingest handler.

The code properly extracts custom fields from the request and passes them to the flatten_and_push_logs function.


98-98: Empty HashMap provided for internal stream ingestion.

For internal streams, an empty HashMap is provided which is appropriate since internal streams don't need custom fields from HTTP headers.


128-130: Consistent implementation of custom fields across handlers.

The custom fields extraction and usage is consistently implemented across all handlers.


162-164: Custom fields properly implemented in metrics ingestion handler.

The implementation matches the pattern established in other handlers, maintaining consistency.


192-194: Custom fields properly implemented in traces ingestion handler.

The implementation follows the same pattern as other handlers, ensuring consistency.


243-244: Custom fields implementation in post_event handler.

The implementation correctly extracts and uses custom fields in the post_event handler.


395-400: Test case updated with custom fields parameter.

The test has been properly updated to include the new required parameter, using an empty HashMap.

src/event/format/mod.rs (3)

118-118: Correct trait signature update for custom fields.

The EventFormat trait has been properly updated to accept custom fields.


139-142: Improved implementation using add_parseable_fields.

The code now uses the add_parseable_fields utility function to add both timestamp and custom fields to the record batch. This is a cleaner approach than the previous implementation.


179-179: Consistent interface update for into_event method.

The into_event method signature has been updated to match the changes in into_recordbatch, maintaining consistency.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 7, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
src/utils/arrow/mod.rs (1)

204-270: 💡 Verification agent

❓ Verification inconclusive

Missing tests for the new add_parseable_fields function.

While there are tests for other functions in this module, there are no tests specifically for the newly added add_parseable_fields function. Consider adding tests that verify:

  1. Custom fields are correctly added to the RecordBatch
  2. The timestamp is correctly inserted
  3. Error cases are properly handled
  4. Behavior when the input RecordBatch is empty
  5. Behavior with empty custom fields

🏁 Script executed:

#!/bin/bash
# Check if there are any tests for the add_parseable_fields function in the codebase
rg -l "test.*add_parseable_fields" --type rust

Length of output: 48


Actionable: Add tests for the new add_parseable_fields function

After verifying the repository, there are no tests referencing add_parseable_fields. Please add tests covering the following aspects:

  • Custom fields are correctly added to the RecordBatch.
  • The timestamp is correctly inserted.
  • Error cases are properly handled.
  • Behavior when the input RecordBatch is empty.
  • Behavior with empty custom fields.
🧹 Nitpick comments (1)
src/utils/arrow/mod.rs (1)

144-192: The implementation correctly adds custom fields to RecordBatches.

The add_parseable_fields function properly handles the addition of a timestamp and custom fields to the RecordBatch. The sorting of keys ensures consistent field order, and the implementation correctly maintains the Arrow memory model with appropriate use of Arc.

Consider these improvements:

  1. Add tests for this new function to verify behavior with various inputs
  2. Check if a timestamp field already exists in the schema before inserting it
  3. Add documentation for possible errors that could be returned
 pub fn add_parseable_fields(
     rb: RecordBatch,
     p_timestamp: DateTime<Utc>,
     p_custom_fields: &HashMap<String, String>,
 ) -> Result<RecordBatch, ArrowError> {
     // Return Result for proper error handling
+    // Possible errors:
+    // - Schema mismatch if fields cannot be combined
+    // - Memory allocation errors for large batches

     // Add custom fields in sorted order
     let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect();
     sorted_keys.sort();

     let schema = rb.schema();
     let row_count = rb.num_rows();

+    // Check if timestamp field already exists
+    let timestamp_field_exists = schema.fields().iter().any(|f| f.name() == DEFAULT_TIMESTAMP_KEY);
+
     let mut fields = schema
         .fields()
         .iter()
         .map(|f| f.as_ref().clone())
         .collect_vec();
-    fields.insert(
-        0,
-        Field::new(
-            DEFAULT_TIMESTAMP_KEY,
-            DataType::Timestamp(TimeUnit::Millisecond, None),
-            true,
-        ),
-    );
+    if !timestamp_field_exists {
+        fields.insert(
+            0,
+            Field::new(
+                DEFAULT_TIMESTAMP_KEY,
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                true,
+            ),
+        );
+    }
     fields.extend(
         sorted_keys
             .iter()
             .map(|k| Field::new(*k, DataType::Utf8, true)),
     );

     let mut columns = rb.columns().iter().map(Arc::clone).collect_vec();
-    columns.insert(
-        0,
-        Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef,
-    );
+    if !timestamp_field_exists {
+        columns.insert(
+            0,
+            Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef,
+        );
+    }
     columns.extend(sorted_keys.iter().map(|k| {
         let value = p_custom_fields.get(*k).unwrap();
         Arc::new(StringArray::from_iter_values(
             std::iter::repeat(value).take(row_count),
         )) as ArrayRef
     }));

     // Create the new schema and batch
     let new_schema = Arc::new(Schema::new(fields));
     RecordBatch::try_new(new_schema, columns)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc70091 and 5f746eb.

📒 Files selected for processing (7)
  • src/event/format/json.rs (2 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/audit.rs (2 hunks)
  • src/handlers/http/ingest.rs (19 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/event/mod.rs
  • src/handlers/http/audit.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/event/format/mod.rs
  • src/event/format/json.rs
  • src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (1)
src/utils/arrow/mod.rs (1)

43-48: Imports are appropriately updated for the new functionality.

The updated imports correctly include the necessary types for handling custom fields: HashMap for storing key-value pairs, and Arrow types like StringArray, ArrowError, DataType, Field, and TimeUnit for creating and manipulating Arrow data structures.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 13, 2025
p_user_agent - fetch user_agent from request header
p_src_ip - fetch source ip from connection info from request header

user can add additional headers to the ingest apis in the below format
`x-p-<key-name>: <value>`
e.g. x-p-environment:dev

server adds `environment` in the events with the value `dev`
user can add multiple custom headers to be inserted as separate fields in the event
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
src/handlers/http/modal/utils/ingest_utils.rs (3)

146-156: Good implementation for extracting HTTP headers.

The function correctly extracts the User-Agent and source IP from the request. Consider adding error logging for cases where these values cannot be extracted.

 pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {
     let user_agent = req
         .headers()
         .get("User-Agent")
         .and_then(|a| a.to_str().ok())
-        .unwrap_or_default();
+        .unwrap_or_else(|| {
+            log::debug!("User-Agent header not found or not valid UTF-8");
+            ""
+        });

     let conn = req.connection_info().clone();

-    let source_ip = conn.realip_remote_addr().unwrap_or_default();
+    let source_ip = conn.realip_remote_addr().unwrap_or_else(|| {
+        log::debug!("Source IP not found in connection info");
+        ""
+    });

157-158: Consider conditional inclusion of empty values.

The current implementation always adds User-Agent and source IP fields, even when they're empty. You might want to add them only when they contain non-empty values.

-    p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string());
-    p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string());
+    if !user_agent.is_empty() {
+        p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string());
+    }
+    if !source_ip.is_empty() {
+        p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string());
+    }

161-169: Check for duplicate header names.

If multiple headers with the same name (after stripping "x-p-") are present, the last one will overwrite previous values. Consider logging a warning when this happens, or implement a strategy to handle duplicates (e.g., comma-separated values).

             if let Ok(value) = header_value.to_str() {
                 let key = header_name.trim_start_matches("x-p-");
-                p_custom_fields.insert(key.to_string(), value.to_string());
+                if p_custom_fields.contains_key(key) {
+                    log::warn!("Duplicate custom field key '{}', overwriting previous value", key);
+                }
+                p_custom_fields.insert(key.to_string(), value.to_string());
             }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f746eb and 78bc79c.

📒 Files selected for processing (7)
  • src/event/format/json.rs (2 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/audit.rs (2 hunks)
  • src/handlers/http/ingest.rs (20 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/event/mod.rs
  • src/handlers/http/audit.rs
  • src/utils/arrow/mod.rs
  • src/event/format/json.rs
  • src/handlers/http/ingest.rs
  • src/event/format/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (6)
src/handlers/http/modal/utils/ingest_utils.rs (6)

19-21: Good addition of required imports.

The new imports for HashMap and HttpRequest are appropriately added to support the custom fields feature.


46-46: Well-defined constant for header exclusions.

Good practice to define a constant array for headers that should be ignored when processing custom headers.


48-53: Function signature properly updated.

The function signature has been correctly updated to accept the new p_custom_fields parameter, aligning with the PR objective of supporting custom fields.


59-59: Consistent propagation of custom fields.

All calls to push_logs have been correctly updated to pass the p_custom_fields parameter, ensuring consistent behavior across different log sources.

Also applies to: 66-67, 73-74, 80-81, 84-85


90-95: Function signature properly updated for push_logs.

The push_logs function signature has been correctly updated to accept custom fields from various log sources.


139-139: Custom fields correctly passed to event processor.

The custom fields are properly passed to the into_event method, ensuring they are included in the event data.

Comment on lines +146 to +171
pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {
let user_agent = req
.headers()
.get("User-Agent")
.and_then(|a| a.to_str().ok())
.unwrap_or_default();

let conn = req.connection_info().clone();

let source_ip = conn.realip_remote_addr().unwrap_or_default();
let mut p_custom_fields = HashMap::new();
p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string());
p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string());

// Iterate through headers and add custom fields
for (header_name, header_value) in req.headers().iter() {
let header_name = header_name.as_str();
if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
if let Ok(value) = header_value.to_str() {
let key = header_name.trim_start_matches("x-p-");
p_custom_fields.insert(key.to_string(), value.to_string());
}
}
}
p_custom_fields
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add sanitization for custom field values.

Consider implementing sanitization for header values to prevent potential injection attacks or malformed data from affecting downstream processing.


🏁 Script executed:

#!/bin/bash
# Check if there's any validation/sanitization for custom fields elsewhere in the codebase

echo "Searching for validation or sanitization of custom fields..."
rg -A 5 -B 5 "custom_fields|p_custom_fields" --type rust

Length of output: 17357


Action Required: Implement Sanitization for Custom Field Values

The function get_custom_fields_from_header in src/handlers/http/modal/utils/ingest_utils.rs (lines 146-171) currently extracts and passes header values directly without any sanitization. This exposes the application to potential injection attacks or issues arising from malformed data.

  • Issue: Header values from headers starting with "x-p-" (and others like "User-Agent") are inserted directly into the custom fields HashMap via direct conversion, with no validation or cleaning.
  • Recommendation: Introduce sanitization measures on these header values. For example, validate the input against an approved character set or use appropriate escaping techniques to ensure that any harmful or malformed data cannot negatively affect downstream processing.

Comment on lines +161 to +169
for (header_name, header_value) in req.headers().iter() {
let header_name = header_name.as_str();
if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
if let Ok(value) = header_value.to_str() {
let key = header_name.trim_start_matches("x-p-");
p_custom_fields.insert(key.to_string(), value.to_string());
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add validation for custom field keys.

There's no validation for the key names after stripping the "x-p-" prefix. If a header like "x-p-" (with nothing after the prefix) is provided, it would result in an empty key. Consider adding validation to ensure non-empty keys.

         if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
             if let Ok(value) = header_value.to_str() {
                 let key = header_name.trim_start_matches("x-p-");
-                p_custom_fields.insert(key.to_string(), value.to_string());
+                if !key.is_empty() {
+                    p_custom_fields.insert(key.to_string(), value.to_string());
+                } else {
+                    log::warn!("Ignoring header with empty key after prefix: {}", header_name);
+                }
             }
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (header_name, header_value) in req.headers().iter() {
let header_name = header_name.as_str();
if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
if let Ok(value) = header_value.to_str() {
let key = header_name.trim_start_matches("x-p-");
p_custom_fields.insert(key.to_string(), value.to_string());
}
}
}
for (header_name, header_value) in req.headers().iter() {
let header_name = header_name.as_str();
if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
if let Ok(value) = header_value.to_str() {
let key = header_name.trim_start_matches("x-p-");
if !key.is_empty() {
p_custom_fields.insert(key.to_string(), value.to_string());
} else {
log::warn!("Ignoring header with empty key after prefix: {}", header_name);
}
}
}
}

pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {
let user_agent = req
.headers()
.get("User-Agent")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.get("User-Agent")
.get(http::header::USER_AGENT)

Copy link
Contributor

@de-sh de-sh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if we had tests to validate behavior of add_parseable_fields

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/utils/arrow/mod.rs (2)

146-194: Consider adding tests for this new function

Given the importance of this function for the custom fields feature, it would be beneficial to add unit tests that verify:

  • Custom fields are properly added
  • Fields appear in the expected order
  • The row count is preserved
  • Edge cases (empty HashMap, large number of fields, etc.)

185-186: Potential unwrap() risk

The unwrap() call on line 185 could potentially panic if the key doesn't exist in the HashMap. While this appears safe since the keys are directly derived from the HashMap itself in the loop, consider using .expect() with a descriptive message for better error reporting in case of future modifications.

-        let value = p_custom_fields.get(*k).unwrap();
+        let value = p_custom_fields.get(*k).expect("Key must exist in HashMap");
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 78bc79c and fdcf9dd.

📒 Files selected for processing (2)
  • src/event/format/mod.rs (4 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/event/format/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/utils/arrow/mod.rs (2)

146-194: Implementation looks solid with good error handling

The new add_parseable_fields function correctly:

  • Sorts keys for deterministic column order
  • Handles schema and data modifications properly
  • Returns Result for error propagation

The implementation will efficiently add both the timestamp field and custom fields to record batches.


43-48: Imports organized well for the new functionality

The additional imports are correctly organized to support the new functionality for handling custom fields and working with Arrow data types.

Comment on lines +146 to +194
pub fn add_parseable_fields(
rb: RecordBatch,
p_timestamp: DateTime<Utc>,
p_custom_fields: &HashMap<String, String>,
) -> Result<RecordBatch, ArrowError> {
// Return Result for proper error handling

// Add custom fields in sorted order
let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect();
sorted_keys.sort();

let schema = rb.schema();
let row_count = rb.num_rows();

let mut fields = schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect_vec();
fields.insert(
0,
Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
);
fields.extend(
sorted_keys
.iter()
.map(|k| Field::new(*k, DataType::Utf8, true)),
);

let mut columns = rb.columns().iter().map(Arc::clone).collect_vec();
columns.insert(
0,
Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef,
);
columns.extend(sorted_keys.iter().map(|k| {
let value = p_custom_fields.get(*k).unwrap();
Arc::new(StringArray::from_iter_values(
std::iter::repeat(value).take(row_count),
)) as ArrayRef
}));

// Create the new schema and batch
let new_schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(new_schema, columns)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider handling field name collisions

The function doesn't check for potential duplicate field names between custom fields and existing fields in the RecordBatch. This could lead to unexpected behavior if a custom field has the same name as an existing field. Consider adding validation to detect and handle such collisions.


🏁 Script executed:

#!/bin/bash
# Check if there are any duplicate field name handling mechanisms elsewhere in the codebase
rg -A 2 -B 2 "duplicate.+field|field.+duplicate|field.*collision|collision.*field" --type rust

Length of output: 599


Action Required: Address Potential Field Name Collisions

The add_parseable_fields function in src/utils/arrow/mod.rs currently appends custom fields to an existing RecordBatch without checking for duplicates. The search in src/static_schema.rs confirms that duplicate field name validation is handled elsewhere in the codebase (e.g., through error definitions and tests). Please consider adapting a similar validation strategy here to ensure that if any custom field name conflicts with an existing field in the RecordBatch, it gets detected and handled appropriately. This may involve:

  • Validating the union of the original schema field names and the custom field names before inserting new fields.
  • Reusing or refactoring the duplicate handling mechanism from src/static_schema.rs where applicable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants