-
-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: retire to_recordbatch
in favor of to_event
#1181
Conversation
to_recordbatch
in favor of to_event
WalkthroughThis update streamlines and simplifies event processing and ingestion across multiple modules. In the Kafka connector, the event processing method has been renamed and refactored to process events directly. In the event formatting modules, new partition extraction logic is introduced, redundant methods removed, and method signatures adjusted for unified error handling. The Event struct now delegates stream name handling to its processing methods via a passed stream parameter. HTTP ingestion functions also utilize the stream object directly, reducing parameter complexity. Finally, the public exports for the parseable module now include the Stream type. Changes
Sequence Diagram(s)sequenceDiagram
participant Kafka as Kafka Consumer
participant Processor as ParseableSinkProcessor
participant Formatter as Event Format
Kafka->>Processor: Provide event chunk (records)
Processor->>Formatter: Invoke to_event (internal conversion)
Formatter-->>Processor: Return processed JSON event
Processor-->>Kafka: Return processing Result
sequenceDiagram
participant HTTP as HTTP Ingest Handler
participant PARSE as Parseable Module
participant Stream as Stream Object
participant Formatter as Event Format
participant EventProc as Event Processor
HTTP->>PARSE: get_stream(stream_name)
PARSE-->>HTTP: Return Stream object
HTTP->>Formatter: Convert JSON to event (to_event with stream)
Formatter-->>HTTP: Return event
HTTP->>EventProc: Process event (using stream parameter)
EventProc-->>HTTP: Return processing Result
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/event/mod.rs (2)
53-89
: Use ofstream_name
fromStream
inprocess
Removing the
stream_name
field fromEvent
and referencingstream.stream_name
directly is consistent with the overall goal of externalizing stream-related data. The logic (committing schema, pushing to storage, updating stats, notifying livetail) remains coherent.Consider factoring out the repeating schema-commit and stats-update logic to a helper if further expansions or variants of
process
are planned, reducing potential duplication.
91-103
: Minimal validations inprocess_unchecked
process_unchecked
skips schema commit and stats updates, which appears intentional. The usage ofstream.stream_name
here is likewise consistent.You might consider centralizing the duplicated push call into a shared helper to avoid divergence over time.
src/event/format/mod.rs (1)
118-179
: Newto_event
methodThis method encapsulates event creation by merging decoded data with partition and schema information, then returning a finalized
Event
. The insertion ofp_timestamp
as a reserved field is clearly enforced.Consider making
origin_format
configurable or automatically inferred if multiple formats are supported; currently, it’s hardcoded to"json"
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
src/connectors/kafka/processor.rs
(4 hunks)src/event/format/json.rs
(2 hunks)src/event/format/mod.rs
(4 hunks)src/event/mod.rs
(4 hunks)src/handlers/http/ingest.rs
(4 hunks)src/handlers/http/modal/utils/ingest_utils.rs
(1 hunks)src/parseable/mod.rs
(1 hunks)
🔇 Additional comments (14)
src/parseable/mod.rs (1)
31-31
: Adding Stream to public exports is appropriate for the refactoring.This change makes the
Stream
type accessible to other modules, which aligns with the PR objective of refactoring event processing to use the Stream object directly rather than passing individual stream properties.src/handlers/http/modal/utils/ingest_utils.rs (1)
114-115
: Simplified the event processing workflow.The event processing has been refactored to use the stream object directly with the new
to_event
method instead of extracting and passing individual properties. This change:
- Reduces parameter complexity
- Makes the code more maintainable
- Creates a more consistent API across the codebase
This aligns with the PR objective of retiring
to_recordbatch
in favor ofto_event
.src/connectors/kafka/processor.rs (3)
30-31
: Properly updated imports to include EventFormat.The imports have been appropriately updated to include both
json
andEventFormat
, which are required for the new event processing workflow.
41-68
: Method signature and implementation improved for clarity and efficiency.The method has been significantly refactored from
build_event_from_chunk
toprocess_event_from_chunk
with key improvements:
- Return type changed from
ParseableEvent
toResult<()>
, reflecting that event processing now happens directly- Eliminated several schema-related variables that were previously extracted from the stream
- Uses the
Stream
object directly for event creation and processing- Simplified the event creation and processing workflow
This change creates a more streamlined event processing pipeline and reduces code complexity.
77-77
: Updated method call to match the new implementation.The method call in the
process
implementation has been correctly updated to use the newprocess_event_from_chunk
method.src/handlers/http/ingest.rs (2)
82-88
: Streamlined the internal stream ingestion process.The
ingest_internal_stream
function has been refactored to use the stream object directly rather than extracting and passing individual properties, making the code cleaner and more maintainable.
233-245
: Simplified unchecked log pushing.The
push_logs_unchecked
function now properly retrieves the stream object and uses it directly inprocess_unchecked
, which is consistent with the refactoring pattern across the codebase.src/event/mod.rs (1)
30-30
: Imports refactored to includeStream
No issues found with this added import. The inclusion of
Stream
aligns with the shift toward passing stream references.src/event/format/json.rs (3)
33-34
: New imports forEventFormat
andEventSchema
The added imports are aligned with the trait and struct usage below.
58-77
: New partition extraction methodget_partitions
Effective handling of both optional
time_partition
andcustom_partitions
. The fallback to the ingestion timestamp iftime_partition
is missing is clear.
86-86
: Switched toanyhow::Result
into_data
Using
anyhow::Result
unifies error handling with other utilities in this file and across the codebase.src/event/format/mod.rs (3)
26-39
: Introducedanyhow
and additional chrono importsNo conflicts or issues with these expanded imports. They facilitate the new approach to unified error handling and timestamp management.
100-105
: Partition logic definition in the traitAdding
fn get_partitions
directly to the trait ensures each event format implements consistent partition-extraction logic. The signature and return type look appropriate.
111-113
: Unified error types forto_data
anddecode
Switching to
anyhow::Result
further standardizes error handling. This simplifies function signatures and helps keep the code uniform.
// assert_eq!( | ||
// rb.column_by_name("c_a") | ||
// .unwrap() | ||
// .as_any() | ||
// .downcast_ref::<ListArray>() | ||
// .unwrap(), | ||
// &ListArray::from_iter_primitive::<Int64Type, _, _>(vec![ | ||
// None, | ||
// None, | ||
// Some(vec![Some(1i64)]), | ||
// Some(vec![Some(1)]) | ||
// ]) | ||
// ); | ||
|
||
// assert_eq!( | ||
// rb.column_by_name("c_b") | ||
// .unwrap() | ||
// .as_any() | ||
// .downcast_ref::<ListArray>() | ||
// .unwrap(), | ||
// &ListArray::from_iter_primitive::<Int64Type, _, _>(vec![ | ||
// None, | ||
// None, | ||
// None, | ||
// Some(vec![Some(2i64)]) | ||
// ]) | ||
// ); | ||
// } | ||
|
||
// #[test] | ||
// fn arr_obj_with_nested_type_v1() { | ||
// let json = json!([ | ||
// { | ||
// "a": 1, | ||
// "b": "hello", | ||
// }, | ||
// { | ||
// "a": 1, | ||
// "b": "hello", | ||
// }, | ||
// { | ||
// "a": 1, | ||
// "b": "hello", | ||
// "c": [{"a": 1}] | ||
// }, | ||
// { | ||
// "a": 1, | ||
// "b": "hello", | ||
// "c": [{"a": 1, "b": 2}] | ||
// }, | ||
// ]); | ||
// let flattened_json = convert_to_array( | ||
// convert_array_to_object( | ||
// json, | ||
// None, | ||
// None, | ||
// None, | ||
// SchemaVersion::V1, | ||
// &crate::event::format::LogSource::default(), | ||
// ) | ||
// .unwrap(), | ||
// ) | ||
// .unwrap(); | ||
|
||
// let (rb, _) = into_event_batch( | ||
// flattened_json, | ||
// HashMap::default(), | ||
// false, | ||
// None, | ||
// SchemaVersion::V1, | ||
// ) | ||
// .unwrap(); | ||
|
||
// assert_eq!(rb.num_rows(), 4); | ||
// assert_eq!(rb.num_columns(), 5); | ||
// assert_eq!( | ||
// rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), | ||
// &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) | ||
// ); | ||
// assert_eq!( | ||
// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), | ||
// &StringArray::from(vec![ | ||
// Some("hello"), | ||
// Some("hello"), | ||
// Some("hello"), | ||
// Some("hello") | ||
// ]) | ||
// ); | ||
|
||
// assert_eq!( | ||
// rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), | ||
// &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) | ||
// ); | ||
|
||
// assert_eq!( | ||
// rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), | ||
// &Float64Array::from(vec![None, None, None, Some(2.0)]) | ||
// ); | ||
// } | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
❓ Verification inconclusive
Consider updating or removing commented-out tests.
A large portion of test code has been commented out, which appears to be related to the retired to_recordbatch
functionality. Since the PR is retiring this functionality, the tests should either be:
- Updated to use the new
to_event
method and uncommented, or - Removed entirely if they're no longer applicable
Commented-out code can lead to confusion and maintenance issues in the future.
🏁 Script executed:
#!/bin/bash
# Check if there are any other tests that would provide coverage for the new functionality
# Look for tests that might be using the new to_event function
rg -p "to_event.*test" --type rust
Length of output: 36
Clean Up Deprecated Test Code in src/handlers/http/ingest.rs
It appears that the commented-out tests (covering the retired to_recordbatch
functionality) have not been replaced with tests for the new to_event
method—as no active tests using to_event
were found. Please either update these tests to use the new to_event
method or remove them entirely to avoid confusion and prevent future maintenance issues.
Fixes #XXXX.
Description
Please note that this PR requires #1180 to be merged first
This PR has:
Summary by CodeRabbit
New Features
Refactor