Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom fields to events #1228

Merged
merged 6 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
*
*/

use std::sync::Arc;

use async_trait::async_trait;
use futures_util::StreamExt;
use rdkafka::consumer::{CommitMode, Consumer};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error};

use crate::{
connectors::common::processor::Processor,
event::{
format::{json, EventFormat, LogSourceEntry},
Event as ParseableEvent,
Event as ParseableEvent, USER_AGENT_KEY,
},
parseable::PARSEABLE,
storage::StreamType,
Expand Down Expand Up @@ -76,6 +76,9 @@ impl ParseableSinkProcessor {
}
}

let mut p_custom_fields = HashMap::new();
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());

let p_event = json::Event::new(Value::Array(json_vec)).into_event(
stream_name.to_string(),
total_payload_size,
Expand All @@ -85,6 +88,7 @@ impl ParseableSinkProcessor {
time_partition.as_ref(),
schema_version,
StreamType::UserDefined,
&p_custom_fields,
)?;

Ok(p_event)
Expand Down
2 changes: 2 additions & 0 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl EventFormat for Event {
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
p_custom_fields: &HashMap<String, String>,
) -> Result<super::Event, anyhow::Error> {
let custom_partition_values = match custom_partitions.as_ref() {
Some(custom_partition) => {
Expand All @@ -168,6 +169,7 @@ impl EventFormat for Event {
static_schema_flag,
time_partition,
schema_version,
p_custom_fields,
)?;

Ok(super::Event {
Expand Down
26 changes: 7 additions & 19 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use serde_json::Value;
use crate::{
metadata::SchemaVersion,
storage::StreamType,
utils::arrow::{get_field, get_timestamp_array, replace_columns},
utils::arrow::{add_parseable_fields, get_field},
};

use super::{Event, DEFAULT_TIMESTAMP_KEY};
Expand Down Expand Up @@ -145,9 +145,10 @@ pub trait EventFormat: Sized {
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
p_custom_fields: &HashMap<String, String>,
) -> Result<(RecordBatch, bool), AnyError> {
let p_timestamp = self.get_p_timestamp();
let (data, mut schema, is_first) = self.to_data(
let (data, schema, is_first) = self.to_data(
storage_schema,
time_partition,
schema_version,
Expand All @@ -161,16 +162,6 @@ pub trait EventFormat: Sized {
));
};

// add the p_timestamp field to the event schema to the 0th index
schema.insert(
0,
Arc::new(Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
);

// prepare the record batch and new fields to be added
let mut new_schema = Arc::new(Schema::new(schema));
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
Expand All @@ -179,13 +170,9 @@ pub trait EventFormat: Sized {
new_schema =
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);

let mut rb = Self::decode(data, new_schema.clone())?;
rb = replace_columns(
rb.schema(),
&rb,
&[0],
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
);
let rb = Self::decode(data, new_schema.clone())?;

let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;

Ok((rb, is_first))
}
Expand Down Expand Up @@ -223,6 +210,7 @@ pub trait EventFormat: Sized {
time_partition: Option<&String>,
schema_version: SchemaVersion,
stream_type: StreamType,
p_custom_fields: &HashMap<String, String>,
) -> Result<Event, AnyError>;
}

Expand Down
3 changes: 3 additions & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use chrono::NaiveDateTime;
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
pub const USER_AGENT_KEY: &str = "p_user_agent";
pub const SOURCE_IP_KEY: &str = "p_src_ip";
pub const FORMAT_KEY: &str = "p_format";

#[derive(Clone)]
pub struct Event {
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/http/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use actix_web::{
middleware::Next,
};
use actix_web_httpauth::extractors::basic::BasicAuth;
use http::header::USER_AGENT;
use ulid::Ulid;

use crate::{
Expand Down Expand Up @@ -85,7 +86,7 @@ pub async fn audit_log_middleware(
)
.with_user_agent(
req.headers()
.get("User-Agent")
.get(USER_AGENT)
.and_then(|a| a.to_str().ok())
.unwrap_or_default(),
)
Expand Down
80 changes: 63 additions & 17 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::flatten::JsonFlattenError;

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::flatten_and_push_logs;
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;

Expand Down Expand Up @@ -72,6 +72,8 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
return Err(PostError::OtelNotSupported);
}

let p_custom_fields = get_custom_fields_from_header(req);

let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
PARSEABLE
.create_stream_if_not_exists(
Expand All @@ -81,7 +83,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -102,6 +104,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
None,
SchemaVersion::V0,
StreamType::Internal,
&HashMap::new(),
)?
.process()?;

Expand Down Expand Up @@ -143,8 +146,9 @@ pub async fn handle_otel_logs_ingestion(
vec![log_source_entry],
)
.await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source).await?;
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -166,6 +170,7 @@ pub async fn handle_otel_metrics_ingestion(
if log_source != LogSource::OtelMetrics {
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
}

let stream_name = stream_name.to_str().unwrap().to_owned();
let log_source_entry = LogSourceEntry::new(
log_source.clone(),
Expand All @@ -182,7 +187,9 @@ pub async fn handle_otel_metrics_ingestion(
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -222,7 +229,9 @@ pub async fn handle_otel_traces_ingestion(
)
.await?;

flatten_and_push_logs(json, &stream_name, &log_source).await?;
let p_custom_fields = get_custom_fields_from_header(req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -271,7 +280,8 @@ pub async fn post_event(
return Err(PostError::OtelNotSupported);
}

flatten_and_push_logs(json, &stream_name, &log_source).await?;
let p_custom_fields = get_custom_fields_from_header(req);
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down Expand Up @@ -421,7 +431,13 @@ mod tests {
});

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -449,7 +465,13 @@ mod tests {
});

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -481,7 +503,7 @@ mod tests {
);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -513,7 +535,7 @@ mod tests {
);

assert!(json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.is_err());
}

Expand All @@ -531,7 +553,7 @@ mod tests {
);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

assert_eq!(rb.num_rows(), 1);
Expand Down Expand Up @@ -572,7 +594,13 @@ mod tests {
]);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
Expand Down Expand Up @@ -620,7 +648,13 @@ mod tests {
]);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
Expand Down Expand Up @@ -669,7 +703,7 @@ mod tests {
);

let (rb, _) = json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.unwrap();

assert_eq!(rb.num_rows(), 3);
Expand Down Expand Up @@ -718,7 +752,7 @@ mod tests {
);

assert!(json::Event::new(json)
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
.is_err());
}

Expand Down Expand Up @@ -758,7 +792,13 @@ mod tests {
.unwrap();

let (rb, _) = json::Event::new(flattened_json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V0,
&HashMap::new(),
)
.unwrap();
assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -841,7 +881,13 @@ mod tests {
.unwrap();

let (rb, _) = json::Event::new(flattened_json)
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
.into_recordbatch(
&HashMap::default(),
false,
None,
SchemaVersion::V1,
&HashMap::new(),
)
.unwrap();

assert_eq!(rb.num_rows(), 4);
Expand Down
Loading
Loading