diff --git a/src/event/mod.rs b/src/event/mod.rs index 1178c7138..f45c17a2b 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -58,7 +58,6 @@ impl Event { key.push_str(&format!("&{k}={v}")); } } - if self.is_first_event { commit_schema(&self.stream_name, self.rb.schema())?; } diff --git a/src/staging/reader.rs b/src/staging/reader.rs index 6df0dc324..f7710135f 100644 --- a/src/staging/reader.rs +++ b/src/staging/reader.rs @@ -84,15 +84,37 @@ pub struct MergedReverseRecordReader { impl MergedReverseRecordReader { pub fn try_new(files: &[PathBuf]) -> Self { let mut readers = Vec::with_capacity(files.len()); - for file in files { - let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - readers.push(reader); + for file in files { + match File::open(file) { + Ok(file) => { + // Create two readers - one for counting and one for keeping + match (get_reverse_reader(file.try_clone().unwrap()), get_reverse_reader(file.try_clone().unwrap())) { + (Ok(count_reader), Ok(keep_reader)) => { + // Count records from the first reader + let count: usize = count_reader + .flat_map(|r| r.ok()) + .map(|b| b.num_rows()).count(); + + println!("File {:?} has {} records", file, count); + + // Keep the second reader for actual processing + readers.push(keep_reader); + } + _ => { + error!("Invalid file detected, ignoring it: {:?}", file); + continue; + } + } + } + Err(e) => { + error!("Failed to open file {:?}: {}", file, e); + continue; + } + } } - + + println!("Total valid readers: {}", readers.len()); Self { readers } } @@ -248,18 +270,30 @@ pub fn get_reverse_reader( ) -> Result>>, io::Error> { let mut offset = 0; let mut messages = Vec::new(); + let mut record_count = 0; while let Some(res) = find_limit_and_type(&mut reader).transpose() { match res { Ok((header, size)) => { + // Log message type and size + // println!("Message type: {:?}, size: {}", header, size); messages.push((header, offset, size)); + if header == MessageHeader::RecordBatch { + record_count += 1; + } offset += size; } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break, + Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => { + println!("Reached EOF after {} record batches", record_count); + break; + } Err(err) => return Err(err), } } + println!("Total record batches found: {}", record_count); + println!("Total messages found: {}", messages.len()); + // reverse everything leaving the first because it has schema message. messages[1..].reverse(); let messages = messages @@ -273,7 +307,6 @@ pub fn get_reverse_reader( Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap()) } -// return limit for fn find_limit_and_type( reader: &mut (impl Read + Seek), ) -> Result, io::Error> { @@ -281,6 +314,51 @@ fn find_limit_and_type( let marker = reader.read_u32::()?; size += 4; + if marker != 0xFFFFFFFF { + println!("Invalid marker: {:x}", marker); + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid Continuation Marker: {:x}", marker), + )); + } + + let metadata_size = reader.read_u32::()? as usize; + size += 4; + + if metadata_size == 0x00000000 { + println!("Found end of stream (metadata_size = 0)"); + return Ok(None); + } + + // println!("Metadata size: {}", metadata_size); + + let mut message = vec![0u8; metadata_size]; + reader.read_exact(&mut message)?; + size += metadata_size; + + let message = unsafe { root_as_message_unchecked(&message) }; + let header = message.header_type(); + let message_size = message.bodyLength(); + + // println!("Message header: {:?}, body length: {}", header, message_size); + + size += message_size as usize; + + let padding = (8 - (size % 8)) % 8; + reader.seek(SeekFrom::Current(padding as i64 + message_size))?; + size += padding; + + Ok(Some((header, size))) +} + +// return limit for +fn find_limit_and_type1( + reader: &mut (impl Read + Seek), +) -> Result, io::Error> { + let mut size = 0; + let marker = reader.read_u32::()?; + size += 4; + if marker != 0xFFFFFFFF { return Err(io::Error::new( io::ErrorKind::InvalidData, diff --git a/src/staging/streams.rs b/src/staging/streams.rs index e04b3f744..aae7e0792 100644 --- a/src/staging/streams.rs +++ b/src/staging/streams.rs @@ -28,7 +28,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::Schema; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, Datelike, NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -141,8 +141,7 @@ impl<'a> Stream<'a> { hostname.push_str(&INGESTOR_META.get_ingestor_id()); } let filename = format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", - Utc::now().format("%Y%m%dT%H%M"), + "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), @@ -160,11 +159,11 @@ impl<'a> Stream<'a> { return vec![]; }; - let paths = dir + let paths: Vec = dir .flatten() .map(|file| file.path()) .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) - .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) + .sorted_by_key(|f| f.metadata().unwrap().created().unwrap()) .collect(); paths @@ -177,24 +176,32 @@ impl<'a> Stream<'a> { /// Only includes ones starting from the previous minute pub fn arrow_files_grouped_exclude_time( &self, - exclude: NaiveDateTime, shutdown_signal: bool, ) -> HashMap> { + let now = Utc::now(); + + // Extract date and time components of current time + let now_date = (now.year(), now.month(), now.day()); + let now_time = (now.hour(), now.minute()); + let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); - - // if the shutdown signal is false i.e. normal condition - // don't keep the ones for the current minute - if !shutdown_signal { - arrow_files.retain(|path| { - !path - .file_name() - .unwrap() - .to_str() - .unwrap() - .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) - }); - } + arrow_files.retain(|path| { + let created_at = path.metadata().unwrap().created().unwrap(); + let created_at: DateTime = created_at.into(); + let created_date = (created_at.year(), created_at.month(), created_at.day()); + let created_time = (created_at.hour(), created_at.minute()); + + let same_date = now_date == created_date; + let same_time = now_time == created_time; + // if the shutdown signal is false i.e. normal condition + // don't keep the ones for the current minute + if !shutdown_signal { + !same_date || !same_time + } else { + true + } + }); let random_string = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); @@ -370,8 +377,7 @@ impl<'a> Stream<'a> { ) -> Result, StagingError> { let mut schemas = Vec::new(); - let time = chrono::Utc::now().naive_utc(); - let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal); + let staging_files = self.arrow_files_grouped_exclude_time(shutdown_signal); if staging_files.is_empty() { metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) @@ -400,9 +406,12 @@ impl<'a> Stream<'a> { } let record_reader = MergedReverseRecordReader::try_new(&arrow_files); + println!("Number of valid readers: {}", record_reader.readers.len()); + if record_reader.readers.is_empty() { continue; } + let merged_schema = record_reader.merged_schema(); let props = parquet_writer_props( @@ -422,9 +431,13 @@ impl<'a> Stream<'a> { .open(&part_path) .map_err(|_| StagingError::Create)?; let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?; + let mut input_count = 0; for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { + let batch_size = record.num_rows(); writer.write(record)?; + input_count += batch_size; } + println!("Total input count: {}", input_count); writer.close()?; if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {