Skip to content

Commit d350df3

Browse files
de-sh7h3cyb3rm0nk
authored andcommitted
refactor: capture ingestion time at receive (#1210)
Currently we are capturing ingestion time in many disjoint places, with this PR we bring it to just one point, right after data is received.
1 parent 4896a72 commit d350df3

File tree

5 files changed

+70
-26
lines changed

5 files changed

+70
-26
lines changed

src/connectors/kafka/processor.rs

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl ParseableSinkProcessor {
6868

6969
let (rb, is_first) = batch_json_event.into_recordbatch(
7070
&schema,
71+
Utc::now(),
7172
static_schema_flag,
7273
time_partition.as_ref(),
7374
schema_version,

src/event/format/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::{
2626
use anyhow::{anyhow, Error as AnyError};
2727
use arrow_array::RecordBatch;
2828
use arrow_schema::{DataType, Field, Schema, TimeUnit};
29-
use chrono::DateTime;
29+
use chrono::{DateTime, Utc};
3030
use serde::{Deserialize, Serialize};
3131
use serde_json::Value;
3232

@@ -108,6 +108,7 @@ pub trait EventFormat: Sized {
108108
fn into_recordbatch(
109109
self,
110110
storage_schema: &HashMap<String, Arc<Field>>,
111+
p_timestamp: DateTime<Utc>,
111112
static_schema_flag: bool,
112113
time_partition: Option<&String>,
113114
schema_version: SchemaVersion,
@@ -145,7 +146,7 @@ pub trait EventFormat: Sized {
145146
rb.schema(),
146147
&rb,
147148
&[0],
148-
&[Arc::new(get_timestamp_array(rb.num_rows()))],
149+
&[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))],
149150
);
150151

151152
Ok((rb, is_first))

src/handlers/http/ingest.rs

+54-16
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7979

8080
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
8181
let size: usize = body.len();
82-
let parsed_timestamp = Utc::now().naive_utc();
82+
let now = Utc::now();
8383
let (rb, is_first) = {
8484
let body_val: Value = serde_json::from_slice(&body)?;
8585
let hash_map = PARSEABLE.streams.read().unwrap();
@@ -93,15 +93,15 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9393
.clone();
9494
let event = format::json::Event { data: body_val };
9595
// For internal streams, use old schema
96-
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
96+
event.into_recordbatch(&schema, now, false, None, SchemaVersion::V0)?
9797
};
9898
event::Event {
9999
rb,
100100
stream_name,
101101
origin_format: "json",
102102
origin_size: size as u64,
103103
is_first_event: is_first,
104-
parsed_timestamp,
104+
parsed_timestamp: now.naive_utc(),
105105
time_partition: None,
106106
custom_partition_values: HashMap::new(),
107107
stream_type: StreamType::Internal,
@@ -351,6 +351,7 @@ mod tests {
351351
use arrow::datatypes::Int64Type;
352352
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
353353
use arrow_schema::{DataType, Field};
354+
use chrono::Utc;
354355
use serde_json::json;
355356
use std::{collections::HashMap, sync::Arc};
356357

@@ -392,8 +393,15 @@ mod tests {
392393
"b": "hello",
393394
});
394395

395-
let (rb, _) =
396-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
396+
let (rb, _) = into_event_batch(
397+
json,
398+
HashMap::default(),
399+
Utc::now(),
400+
false,
401+
None,
402+
SchemaVersion::V0,
403+
)
404+
.unwrap();
397405

398406
assert_eq!(rb.num_rows(), 1);
399407
assert_eq!(rb.num_columns(), 4);
@@ -419,8 +427,15 @@ mod tests {
419427
"c": null
420428
});
421429

422-
let (rb, _) =
423-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
430+
let (rb, _) = into_event_batch(
431+
json,
432+
HashMap::default(),
433+
Utc::now(),
434+
false,
435+
None,
436+
SchemaVersion::V0,
437+
)
438+
.unwrap();
424439

425440
assert_eq!(rb.num_rows(), 1);
426441
assert_eq!(rb.num_columns(), 3);
@@ -450,7 +465,8 @@ mod tests {
450465
.into_iter(),
451466
);
452467

453-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
468+
let (rb, _) =
469+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
454470

455471
assert_eq!(rb.num_rows(), 1);
456472
assert_eq!(rb.num_columns(), 3);
@@ -480,7 +496,9 @@ mod tests {
480496
.into_iter(),
481497
);
482498

483-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
499+
assert!(
500+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
501+
);
484502
}
485503

486504
#[test]
@@ -496,7 +514,8 @@ mod tests {
496514
.into_iter(),
497515
);
498516

499-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
517+
let (rb, _) =
518+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
500519

501520
assert_eq!(rb.num_rows(), 1);
502521
assert_eq!(rb.num_columns(), 1);
@@ -535,8 +554,15 @@ mod tests {
535554
},
536555
]);
537556

538-
let (rb, _) =
539-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
557+
let (rb, _) = into_event_batch(
558+
json,
559+
HashMap::default(),
560+
Utc::now(),
561+
false,
562+
None,
563+
SchemaVersion::V0,
564+
)
565+
.unwrap();
540566

541567
assert_eq!(rb.num_rows(), 3);
542568
assert_eq!(rb.num_columns(), 4);
@@ -582,8 +608,15 @@ mod tests {
582608
},
583609
]);
584610

585-
let (rb, _) =
586-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
611+
let (rb, _) = into_event_batch(
612+
json,
613+
HashMap::default(),
614+
Utc::now(),
615+
false,
616+
None,
617+
SchemaVersion::V0,
618+
)
619+
.unwrap();
587620

588621
assert_eq!(rb.num_rows(), 3);
589622
assert_eq!(rb.num_columns(), 4);
@@ -630,7 +663,8 @@ mod tests {
630663
.into_iter(),
631664
);
632665

633-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
666+
let (rb, _) =
667+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0).unwrap();
634668

635669
assert_eq!(rb.num_rows(), 3);
636670
assert_eq!(rb.num_columns(), 4);
@@ -677,7 +711,9 @@ mod tests {
677711
.into_iter(),
678712
);
679713

680-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
714+
assert!(
715+
into_event_batch(json, schema, Utc::now(), false, None, SchemaVersion::V0,).is_err()
716+
);
681717
}
682718

683719
#[test]
@@ -718,6 +754,7 @@ mod tests {
718754
let (rb, _) = into_event_batch(
719755
flattened_json,
720756
HashMap::default(),
757+
Utc::now(),
721758
false,
722759
None,
723760
SchemaVersion::V0,
@@ -806,6 +843,7 @@ mod tests {
806843
let (rb, _) = into_event_batch(
807844
flattened_json,
808845
HashMap::default(),
846+
Utc::now(),
809847
false,
810848
None,
811849
SchemaVersion::V1,

src/handlers/http/modal/utils/ingest_utils.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ async fn push_logs(
9696
let static_schema_flag = stream.get_static_schema_flag();
9797
let custom_partition = stream.get_custom_partition();
9898
let schema_version = stream.get_schema_version();
99+
let p_timestamp = Utc::now();
99100

100101
let data = if time_partition.is_some() || custom_partition.is_some() {
101102
convert_array_to_object(
@@ -121,7 +122,7 @@ async fn push_logs(
121122
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
122123
let parsed_timestamp = match time_partition.as_ref() {
123124
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
124-
_ => Utc::now().naive_utc(),
125+
_ => p_timestamp.naive_utc(),
125126
};
126127
let custom_partition_values = match custom_partition.as_ref() {
127128
Some(custom_partition) => {
@@ -144,6 +145,7 @@ async fn push_logs(
144145
let (rb, is_first_event) = into_event_batch(
145146
value,
146147
schema,
148+
p_timestamp,
147149
static_schema_flag,
148150
time_partition.as_ref(),
149151
schema_version,
@@ -168,12 +170,14 @@ async fn push_logs(
168170
pub fn into_event_batch(
169171
data: Value,
170172
schema: HashMap<String, Arc<Field>>,
173+
p_timestamp: DateTime<Utc>,
171174
static_schema_flag: bool,
172175
time_partition: Option<&String>,
173176
schema_version: SchemaVersion,
174177
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
175178
let (rb, is_first) = json::Event { data }.into_recordbatch(
176179
&schema,
180+
p_timestamp,
177181
static_schema_flag,
178182
time_partition,
179183
schema_version,

src/utils/arrow/mod.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use std::sync::Arc;
4545
use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array};
4646
use arrow_schema::Schema;
4747
use arrow_select::take::take;
48-
use chrono::Utc;
48+
use chrono::{DateTime, Utc};
4949
use itertools::Itertools;
5050

5151
pub mod batch_adapter;
@@ -133,8 +133,8 @@ pub fn get_field<'a>(
133133
/// # Returns
134134
///
135135
/// A column in arrow, containing the current timestamp in millis.
136-
pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
137-
TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size)
136+
pub fn get_timestamp_array(p_timestamp: DateTime<Utc>, size: usize) -> TimestampMillisecondArray {
137+
TimestampMillisecondArray::from_value(p_timestamp.timestamp_millis(), size)
138138
}
139139

140140
pub fn reverse(rb: &RecordBatch) -> RecordBatch {
@@ -196,19 +196,19 @@ mod tests {
196196
#[test]
197197
fn test_timestamp_array_has_correct_size_and_value() {
198198
let size = 5;
199-
let now = Utc::now().timestamp_millis();
199+
let now = Utc::now();
200200

201-
let array = get_timestamp_array(size);
201+
let array = get_timestamp_array(now, size);
202202

203203
assert_eq!(array.len(), size);
204204
for i in 0..size {
205-
assert!(array.value(i) >= now);
205+
assert!(array.value(i) >= now.timestamp_millis());
206206
}
207207
}
208208

209209
#[test]
210210
fn test_timestamp_array_with_zero_size() {
211-
let array = get_timestamp_array(0);
211+
let array = get_timestamp_array(Utc::now(), 0);
212212

213213
assert_eq!(array.len(), 0);
214214
assert!(array.is_empty());

0 commit comments

Comments
 (0)