Skip to content

Commit 6fe35a6

Browse files
feat: add custom fields to events (#1228)
p_user_agent - fetch user_agent from request header p_src_ip - fetch source ip from connection info from request header user can add additional headers to the ingest apis in the below format `x-p-<key-name>: <value>` e.g. x-p-environment:dev server adds `environment` in the events with the value `dev` user can add multiple custom headers to be inserted as separate fields in the event
1 parent 1afa318 commit 6fe35a6

File tree

8 files changed

+291
-52
lines changed

8 files changed

+291
-52
lines changed

src/connectors/kafka/processor.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@
1616
*
1717
*/
1818

19-
use std::sync::Arc;
20-
2119
use async_trait::async_trait;
2220
use futures_util::StreamExt;
2321
use rdkafka::consumer::{CommitMode, Consumer};
2422
use serde_json::Value;
23+
use std::collections::HashMap;
24+
use std::sync::Arc;
2525
use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, error};
2727

2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
3131
format::{json, EventFormat, LogSourceEntry},
32-
Event as ParseableEvent,
32+
Event as ParseableEvent, USER_AGENT_KEY,
3333
},
3434
parseable::PARSEABLE,
3535
storage::StreamType,
@@ -76,6 +76,9 @@ impl ParseableSinkProcessor {
7676
}
7777
}
7878

79+
let mut p_custom_fields = HashMap::new();
80+
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());
81+
7982
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
8083
stream_name.to_string(),
8184
total_payload_size,
@@ -85,6 +88,7 @@ impl ParseableSinkProcessor {
8588
time_partition.as_ref(),
8689
schema_version,
8790
StreamType::UserDefined,
91+
&p_custom_fields,
8892
)?;
8993

9094
Ok(p_event)

src/event/format/json.rs

+2
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl EventFormat for Event {
149149
time_partition: Option<&String>,
150150
schema_version: SchemaVersion,
151151
stream_type: StreamType,
152+
p_custom_fields: &HashMap<String, String>,
152153
) -> Result<super::Event, anyhow::Error> {
153154
let custom_partition_values = match custom_partitions.as_ref() {
154155
Some(custom_partition) => {
@@ -168,6 +169,7 @@ impl EventFormat for Event {
168169
static_schema_flag,
169170
time_partition,
170171
schema_version,
172+
p_custom_fields,
171173
)?;
172174

173175
Ok(super::Event {

src/event/format/mod.rs

+7-19
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use serde_json::Value;
3333
use crate::{
3434
metadata::SchemaVersion,
3535
storage::StreamType,
36-
utils::arrow::{get_field, get_timestamp_array, replace_columns},
36+
utils::arrow::{add_parseable_fields, get_field},
3737
};
3838

3939
use super::{Event, DEFAULT_TIMESTAMP_KEY};
@@ -145,9 +145,10 @@ pub trait EventFormat: Sized {
145145
static_schema_flag: bool,
146146
time_partition: Option<&String>,
147147
schema_version: SchemaVersion,
148+
p_custom_fields: &HashMap<String, String>,
148149
) -> Result<(RecordBatch, bool), AnyError> {
149150
let p_timestamp = self.get_p_timestamp();
150-
let (data, mut schema, is_first) = self.to_data(
151+
let (data, schema, is_first) = self.to_data(
151152
storage_schema,
152153
time_partition,
153154
schema_version,
@@ -161,16 +162,6 @@ pub trait EventFormat: Sized {
161162
));
162163
};
163164

164-
// add the p_timestamp field to the event schema to the 0th index
165-
schema.insert(
166-
0,
167-
Arc::new(Field::new(
168-
DEFAULT_TIMESTAMP_KEY,
169-
DataType::Timestamp(TimeUnit::Millisecond, None),
170-
true,
171-
)),
172-
);
173-
174165
// prepare the record batch and new fields to be added
175166
let mut new_schema = Arc::new(Schema::new(schema));
176167
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
@@ -179,13 +170,9 @@ pub trait EventFormat: Sized {
179170
new_schema =
180171
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
181172

182-
let mut rb = Self::decode(data, new_schema.clone())?;
183-
rb = replace_columns(
184-
rb.schema(),
185-
&rb,
186-
&[0],
187-
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
188-
);
173+
let rb = Self::decode(data, new_schema.clone())?;
174+
175+
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;
189176

190177
Ok((rb, is_first))
191178
}
@@ -223,6 +210,7 @@ pub trait EventFormat: Sized {
223210
time_partition: Option<&String>,
224211
schema_version: SchemaVersion,
225212
stream_type: StreamType,
213+
p_custom_fields: &HashMap<String, String>,
226214
) -> Result<Event, AnyError>;
227215
}
228216

src/event/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ use chrono::NaiveDateTime;
3535
use std::collections::HashMap;
3636

3737
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
38+
pub const USER_AGENT_KEY: &str = "p_user_agent";
39+
pub const SOURCE_IP_KEY: &str = "p_src_ip";
40+
pub const FORMAT_KEY: &str = "p_format";
3841

3942
#[derive(Clone)]
4043
pub struct Event {

src/handlers/http/audit.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use actix_web::{
2323
middleware::Next,
2424
};
2525
use actix_web_httpauth::extractors::basic::BasicAuth;
26+
use http::header::USER_AGENT;
2627
use ulid::Ulid;
2728

2829
use crate::{
@@ -85,7 +86,7 @@ pub async fn audit_log_middleware(
8586
)
8687
.with_user_agent(
8788
req.headers()
88-
.get("User-Agent")
89+
.get(USER_AGENT)
8990
.and_then(|a| a.to_str().ok())
9091
.unwrap_or_default(),
9192
)

src/handlers/http/ingest.rs

+63-17
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::utils::header_parsing::ParseHeaderError;
4141
use crate::utils::json::flatten::JsonFlattenError;
4242

4343
use super::logstream::error::{CreateStreamError, StreamError};
44-
use super::modal::utils::ingest_utils::flatten_and_push_logs;
44+
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
4545
use super::users::dashboards::DashboardError;
4646
use super::users::filters::FiltersError;
4747

@@ -72,6 +72,8 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7272
return Err(PostError::OtelNotSupported);
7373
}
7474

75+
let p_custom_fields = get_custom_fields_from_header(req);
76+
7577
let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
7678
PARSEABLE
7779
.create_stream_if_not_exists(
@@ -81,7 +83,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
8183
)
8284
.await?;
8385

84-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
86+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
8587

8688
Ok(HttpResponse::Ok().finish())
8789
}
@@ -102,6 +104,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
102104
None,
103105
SchemaVersion::V0,
104106
StreamType::Internal,
107+
&HashMap::new(),
105108
)?
106109
.process()?;
107110

@@ -143,8 +146,9 @@ pub async fn handle_otel_logs_ingestion(
143146
vec![log_source_entry],
144147
)
145148
.await?;
149+
let p_custom_fields = get_custom_fields_from_header(req);
146150

147-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
151+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
148152

149153
Ok(HttpResponse::Ok().finish())
150154
}
@@ -166,6 +170,7 @@ pub async fn handle_otel_metrics_ingestion(
166170
if log_source != LogSource::OtelMetrics {
167171
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
168172
}
173+
169174
let stream_name = stream_name.to_str().unwrap().to_owned();
170175
let log_source_entry = LogSourceEntry::new(
171176
log_source.clone(),
@@ -182,7 +187,9 @@ pub async fn handle_otel_metrics_ingestion(
182187
)
183188
.await?;
184189

185-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
190+
let p_custom_fields = get_custom_fields_from_header(req);
191+
192+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
186193

187194
Ok(HttpResponse::Ok().finish())
188195
}
@@ -222,7 +229,9 @@ pub async fn handle_otel_traces_ingestion(
222229
)
223230
.await?;
224231

225-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
232+
let p_custom_fields = get_custom_fields_from_header(req);
233+
234+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
226235

227236
Ok(HttpResponse::Ok().finish())
228237
}
@@ -271,7 +280,8 @@ pub async fn post_event(
271280
return Err(PostError::OtelNotSupported);
272281
}
273282

274-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
283+
let p_custom_fields = get_custom_fields_from_header(req);
284+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
275285

276286
Ok(HttpResponse::Ok().finish())
277287
}
@@ -421,7 +431,13 @@ mod tests {
421431
});
422432

423433
let (rb, _) = json::Event::new(json)
424-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
434+
.into_recordbatch(
435+
&HashMap::default(),
436+
false,
437+
None,
438+
SchemaVersion::V0,
439+
&HashMap::new(),
440+
)
425441
.unwrap();
426442

427443
assert_eq!(rb.num_rows(), 1);
@@ -449,7 +465,13 @@ mod tests {
449465
});
450466

451467
let (rb, _) = json::Event::new(json)
452-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
468+
.into_recordbatch(
469+
&HashMap::default(),
470+
false,
471+
None,
472+
SchemaVersion::V0,
473+
&HashMap::new(),
474+
)
453475
.unwrap();
454476

455477
assert_eq!(rb.num_rows(), 1);
@@ -481,7 +503,7 @@ mod tests {
481503
);
482504

483505
let (rb, _) = json::Event::new(json)
484-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
506+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
485507
.unwrap();
486508

487509
assert_eq!(rb.num_rows(), 1);
@@ -513,7 +535,7 @@ mod tests {
513535
);
514536

515537
assert!(json::Event::new(json)
516-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
538+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
517539
.is_err());
518540
}
519541

@@ -531,7 +553,7 @@ mod tests {
531553
);
532554

533555
let (rb, _) = json::Event::new(json)
534-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
556+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
535557
.unwrap();
536558

537559
assert_eq!(rb.num_rows(), 1);
@@ -572,7 +594,13 @@ mod tests {
572594
]);
573595

574596
let (rb, _) = json::Event::new(json)
575-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
597+
.into_recordbatch(
598+
&HashMap::default(),
599+
false,
600+
None,
601+
SchemaVersion::V0,
602+
&HashMap::new(),
603+
)
576604
.unwrap();
577605

578606
assert_eq!(rb.num_rows(), 3);
@@ -620,7 +648,13 @@ mod tests {
620648
]);
621649

622650
let (rb, _) = json::Event::new(json)
623-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
651+
.into_recordbatch(
652+
&HashMap::default(),
653+
false,
654+
None,
655+
SchemaVersion::V0,
656+
&HashMap::new(),
657+
)
624658
.unwrap();
625659

626660
assert_eq!(rb.num_rows(), 3);
@@ -669,7 +703,7 @@ mod tests {
669703
);
670704

671705
let (rb, _) = json::Event::new(json)
672-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
706+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
673707
.unwrap();
674708

675709
assert_eq!(rb.num_rows(), 3);
@@ -718,7 +752,7 @@ mod tests {
718752
);
719753

720754
assert!(json::Event::new(json)
721-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
755+
.into_recordbatch(&schema, false, None, SchemaVersion::V0, &HashMap::new())
722756
.is_err());
723757
}
724758

@@ -758,7 +792,13 @@ mod tests {
758792
.unwrap();
759793

760794
let (rb, _) = json::Event::new(flattened_json)
761-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
795+
.into_recordbatch(
796+
&HashMap::default(),
797+
false,
798+
None,
799+
SchemaVersion::V0,
800+
&HashMap::new(),
801+
)
762802
.unwrap();
763803
assert_eq!(rb.num_rows(), 4);
764804
assert_eq!(rb.num_columns(), 5);
@@ -841,7 +881,13 @@ mod tests {
841881
.unwrap();
842882

843883
let (rb, _) = json::Event::new(flattened_json)
844-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
884+
.into_recordbatch(
885+
&HashMap::default(),
886+
false,
887+
None,
888+
SchemaVersion::V1,
889+
&HashMap::new(),
890+
)
845891
.unwrap();
846892

847893
assert_eq!(rb.num_rows(), 4);

0 commit comments

Comments
 (0)