Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: event handling in parseable #1218

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e2a1fc3
refactor: accept array in `push_logs`
de-sh Feb 28, 2025
d04ba90
move tests to associated module
de-sh Feb 28, 2025
5798b7b
refactor: remove `is_schema_matching`
de-sh Feb 28, 2025
78da6b4
doc: improve readability
de-sh Mar 1, 2025
938c33d
simplify `replace_columns`
de-sh Mar 1, 2025
6fb5385
ci: fix imports
de-sh Mar 1, 2025
23155b6
push flattening into `Event`
de-sh Mar 1, 2025
d604b65
refactor: move kinesis to lib level
de-sh Mar 1, 2025
303ba35
refactor: perform flattening in `to_data` alone
de-sh Mar 1, 2025
c2faefc
refactor: further streamline, associate w/ `Parseable`
de-sh Mar 1, 2025
354061a
ci: deepsource suggestion
de-sh Mar 1, 2025
1386d3b
fix: flattening
de-sh Mar 1, 2025
38a52c2
remove unused code
de-sh Mar 1, 2025
a7b2db3
fix: partitioning
de-sh Mar 1, 2025
dc34a85
refactor: share `Stream` state when processing
de-sh Mar 1, 2025
19708df
refactor: `Parseable::commit_schema`
de-sh Mar 1, 2025
7215e8e
map schema keys to recordbatches
de-sh Mar 1, 2025
975b1f6
construct map directly
de-sh Mar 1, 2025
4c1f6d8
fix: concat to not lose data
de-sh Mar 1, 2025
5a2bcc1
refactor: extract `byte_size` during json deserialization
de-sh Mar 1, 2025
f268422
ci: clippy suggestion
de-sh Mar 2, 2025
d304b9f
feat: `DiskWriter` handles writing to arrow part file
de-sh Mar 2, 2025
97f5603
test: fix expectation
de-sh Mar 2, 2025
a9513c4
refactor: don't add a step
de-sh Mar 2, 2025
1c98e3b
refactor: `get_schemas_if_present`
de-sh Mar 2, 2025
a86fc47
refactor: `prepare_and_validate_schema`
de-sh Mar 3, 2025
51d166e
refactor: event construction and processing
de-sh Mar 3, 2025
218080e
fix: concat at once
de-sh Mar 3, 2025
aa0befa
refactor: separate out flatten from schema inference
de-sh Mar 3, 2025
da5e16c
style: `anyhow::Result`
de-sh Mar 3, 2025
9b0d865
fix: rb per object
de-sh Mar 3, 2025
5bbd309
Merge remote-tracking branch 'origin/main'
de-sh Mar 4, 2025
d096ce0
perf: partition at json level
de-sh Mar 4, 2025
e15b0d2
style: deepsource suggestion
de-sh Mar 4, 2025
b106106
Merge branch 'main' into main
de-sh Mar 7, 2025
ef27f97
chore: remove unused
de-sh Mar 10, 2025
b8606b3
fix: custom partitioned file names
de-sh Mar 10, 2025
fbd3a18
perf: use a buffer
de-sh Mar 10, 2025
c2f6769
refactor: drop to flush
de-sh Mar 10, 2025
592d3a1
fix & test: prefix generation
de-sh Mar 10, 2025
6bb7afc
Merge branch 'main' into main
de-sh Mar 14, 2025
924ef5b
Merge remote-tracking branch 'origin/main'
de-sh Mar 16, 2025
b108f92
Merge branch 'main' into main
de-sh Mar 16, 2025
3d8c338
spinoff #1251
de-sh Mar 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ build = "build.rs"

[dependencies]
# Arrow and DataFusion ecosystem
arrow-array = { version = "53.0.0" }
arrow = "53.0.0"
arrow-array = "53.0.0"
arrow-flight = { version = "53.0.0", features = ["tls"] }
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
arrow-json = "53.0.0"
Expand Down
46 changes: 16 additions & 30 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use tracing::{debug, error};

use crate::{
connectors::common::processor::Processor,
event::{
format::{json, EventFormat, LogSource},
Event as ParseableEvent,
},
event::format::{json, EventFormat, LogSource},
parseable::PARSEABLE,
storage::StreamType,
};
Expand All @@ -41,10 +38,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition
pub struct ParseableSinkProcessor;

impl ParseableSinkProcessor {
async fn build_event_from_chunk(
&self,
records: &[ConsumerRecord],
) -> anyhow::Result<ParseableEvent> {
async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<usize> {
let stream_name = records
.first()
.map(|r| r.topic.as_str())
Expand All @@ -54,35 +48,27 @@ impl ParseableSinkProcessor {
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
.await?;

let stream = PARSEABLE.get_stream(stream_name)?;
let schema = stream.get_schema_raw();
let time_partition = stream.get_time_partition();
let custom_partition = stream.get_custom_partition();
let static_schema_flag = stream.get_static_schema_flag();
let schema_version = stream.get_schema_version();

let mut json_vec = Vec::with_capacity(records.len());
let mut total_payload_size = 0u64;
let mut total_payload_size = 0;

for record in records.iter().filter_map(|r| r.payload.as_ref()) {
total_payload_size += record.len() as u64;
total_payload_size += record.len();
if let Ok(value) = serde_json::from_slice::<Value>(record) {
json_vec.push(value);
}
}

let p_event = json::Event::new(Value::Array(json_vec)).into_event(
stream_name.to_string(),
let stream = PARSEABLE.get_or_create_stream(stream_name);

json::Event::new(
Value::Array(json_vec),
total_payload_size,
&schema,
static_schema_flag,
custom_partition.as_ref(),
time_partition.as_ref(),
schema_version,
StreamType::UserDefined,
)?;

Ok(p_event)
LogSource::Custom("Kafka".to_owned()),
)
.into_event(&stream)?
.process(&stream)?;

Ok(total_payload_size)
}
}

Expand All @@ -92,9 +78,9 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
let len = records.len();
debug!("Processing {len} records");

self.build_event_from_chunk(&records).await?.process()?;
let size = self.process_event_from_chunk(&records).await?;

debug!("Processed {len} records");
debug!("Processed {len} records, size = {size} Bytes");
Ok(())
}
}
Expand Down
Loading
Loading