Skip to content

Commit 6fc4a5b

Browse files
remove: p-meta and p-tags (#1067)
remove p-meta, p-tags from ingestion flow parseable will not add these fields to the event remove from static schema creation remove from query flow
1 parent 7a797fa commit 6fc4a5b

File tree

10 files changed

+29
-221
lines changed

10 files changed

+29
-221
lines changed

Diff for: src/event/format/json.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,14 @@ use serde_json::Value;
2929
use std::{collections::HashMap, sync::Arc};
3030
use tracing::error;
3131

32-
use super::{EventFormat, LogSource, Metadata, Tags};
32+
use super::{EventFormat, LogSource};
3333
use crate::{
3434
metadata::SchemaVersion,
3535
utils::{arrow::get_field, json::flatten_json_body},
3636
};
3737

3838
pub struct Event {
3939
pub data: Value,
40-
pub tags: Tags,
41-
pub metadata: Metadata,
4240
}
4341

4442
impl EventFormat for Event {
@@ -53,7 +51,7 @@ impl EventFormat for Event {
5351
time_partition: Option<&String>,
5452
schema_version: SchemaVersion,
5553
log_source: &LogSource,
56-
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
54+
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
5755
let data = flatten_json_body(
5856
self.data,
5957
None,
@@ -119,7 +117,7 @@ impl EventFormat for Event {
119117
));
120118
}
121119

122-
Ok((value_arr, schema, is_first, self.tags, self.metadata))
120+
Ok((value_arr, schema, is_first))
123121
}
124122

125123
// Convert the Data type (defined above) to arrow record batch

Diff for: src/event/format/mod.rs

+5-43
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,18 @@ use std::{
2323
};
2424

2525
use anyhow::{anyhow, Error as AnyError};
26-
use arrow_array::{RecordBatch, StringArray};
26+
use arrow_array::RecordBatch;
2727
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2828
use chrono::DateTime;
2929
use serde_json::Value;
3030

31-
use crate::{
32-
metadata::SchemaVersion,
33-
utils::{self, arrow::get_field},
34-
};
31+
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
3532

36-
use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY};
33+
use super::DEFAULT_TIMESTAMP_KEY;
3734

3835
pub mod json;
3936

4037
static TIME_FIELD_NAME_PARTS: [&str; 2] = ["time", "date"];
41-
42-
type Tags = String;
43-
type Metadata = String;
4438
type EventSchema = Vec<Arc<Field>>;
4539

4640
/// Source of the logs, used to perform special processing for certain sources
@@ -87,7 +81,7 @@ pub trait EventFormat: Sized {
8781
time_partition: Option<&String>,
8882
schema_version: SchemaVersion,
8983
log_source: &LogSource,
90-
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
84+
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
9185

9286
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
9387

@@ -99,26 +93,14 @@ pub trait EventFormat: Sized {
9993
schema_version: SchemaVersion,
10094
log_source: &LogSource,
10195
) -> Result<(RecordBatch, bool), AnyError> {
102-
let (data, mut schema, is_first, tags, metadata) = self.to_data(
96+
let (data, mut schema, is_first) = self.to_data(
10397
storage_schema,
10498
static_schema_flag,
10599
time_partition,
106100
schema_version,
107101
log_source,
108102
)?;
109103

110-
// DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names
111-
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
112-
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
113-
};
114-
115-
if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
116-
return Err(anyhow!(
117-
"field {} is a reserved field",
118-
DEFAULT_METADATA_KEY
119-
));
120-
};
121-
122104
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
123105
return Err(anyhow!(
124106
"field {} is a reserved field",
@@ -136,16 +118,6 @@ pub trait EventFormat: Sized {
136118
)),
137119
);
138120

139-
// p_tags and p_metadata are added to the end of the schema
140-
let tags_index = schema.len();
141-
let metadata_index = tags_index + 1;
142-
schema.push(Arc::new(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)));
143-
schema.push(Arc::new(Field::new(
144-
DEFAULT_METADATA_KEY,
145-
DataType::Utf8,
146-
true,
147-
)));
148-
149121
// prepare the record batch and new fields to be added
150122
let mut new_schema = Arc::new(Schema::new(schema));
151123
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
@@ -154,16 +126,6 @@ pub trait EventFormat: Sized {
154126
new_schema =
155127
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
156128
let rb = Self::decode(data, new_schema.clone())?;
157-
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
158-
let metadata_arr =
159-
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
160-
// modify the record batch to add fields to respective indexes
161-
let rb = utils::arrow::replace_columns(
162-
Arc::clone(&new_schema),
163-
&rb,
164-
&[tags_index, metadata_index],
165-
&[Arc::new(tags_arr), Arc::new(metadata_arr)],
166-
);
167129

168130
Ok((rb, is_first))
169131
}

Diff for: src/event/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ use chrono::NaiveDateTime;
3333
use std::collections::HashMap;
3434

3535
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
36-
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
37-
pub const DEFAULT_METADATA_KEY: &str = "p_metadata";
3836

3937
#[derive(Clone)]
4038
pub struct Event {

0 commit comments

Comments
 (0)