Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for converting arrows to parquet #1182

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl Event {
key.push_str(&format!("&{k}={v}"));
}
}

if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
}
Expand Down
96 changes: 87 additions & 9 deletions src/staging/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,37 @@ pub struct MergedReverseRecordReader {
impl MergedReverseRecordReader {
pub fn try_new(files: &[PathBuf]) -> Self {
let mut readers = Vec::with_capacity(files.len());
for file in files {
let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else {
error!("Invalid file detected, ignoring it: {:?}", file);
continue;
};

readers.push(reader);
for file in files {
match File::open(file) {
Ok(file) => {
// Create two readers - one for counting and one for keeping
match (get_reverse_reader(file.try_clone().unwrap()), get_reverse_reader(file.try_clone().unwrap())) {
(Ok(count_reader), Ok(keep_reader)) => {
// Count records from the first reader
let count: usize = count_reader
.flat_map(|r| r.ok())
.map(|b| b.num_rows()).count();

println!("File {:?} has {} records", file, count);

// Keep the second reader for actual processing
readers.push(keep_reader);
}
_ => {
error!("Invalid file detected, ignoring it: {:?}", file);
continue;
}
}
}
Err(e) => {
error!("Failed to open file {:?}: {}", file, e);
continue;
}
}
}


println!("Total valid readers: {}", readers.len());
Self { readers }
}

Expand Down Expand Up @@ -248,18 +270,30 @@ pub fn get_reverse_reader<T: Read + Seek>(
) -> Result<StreamReader<BufReader<OffsetReader<T>>>, io::Error> {
let mut offset = 0;
let mut messages = Vec::new();
let mut record_count = 0;

while let Some(res) = find_limit_and_type(&mut reader).transpose() {
match res {
Ok((header, size)) => {
// Log message type and size
// println!("Message type: {:?}, size: {}", header, size);
messages.push((header, offset, size));
if header == MessageHeader::RecordBatch {
record_count += 1;
}
offset += size;
}
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
println!("Reached EOF after {} record batches", record_count);
break;
}
Err(err) => return Err(err),
}
}

println!("Total record batches found: {}", record_count);
println!("Total messages found: {}", messages.len());

// reverse everything leaving the first because it has schema message.
messages[1..].reverse();
let messages = messages
Expand All @@ -273,14 +307,58 @@ pub fn get_reverse_reader<T: Read + Seek>(
Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap())
}

// return limit for
fn find_limit_and_type(
reader: &mut (impl Read + Seek),
) -> Result<Option<(MessageHeader, usize)>, io::Error> {
let mut size = 0;
let marker = reader.read_u32::<LittleEndian>()?;
size += 4;

if marker != 0xFFFFFFFF {
println!("Invalid marker: {:x}", marker);
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid Continuation Marker: {:x}", marker),
));
}

let metadata_size = reader.read_u32::<LittleEndian>()? as usize;
size += 4;

if metadata_size == 0x00000000 {
println!("Found end of stream (metadata_size = 0)");
return Ok(None);
}

// println!("Metadata size: {}", metadata_size);

let mut message = vec![0u8; metadata_size];
reader.read_exact(&mut message)?;
size += metadata_size;

let message = unsafe { root_as_message_unchecked(&message) };
let header = message.header_type();
let message_size = message.bodyLength();

// println!("Message header: {:?}, body length: {}", header, message_size);

size += message_size as usize;

let padding = (8 - (size % 8)) % 8;
reader.seek(SeekFrom::Current(padding as i64 + message_size))?;
size += padding;

Ok(Some((header, size)))
}

// return limit for
fn find_limit_and_type1(
reader: &mut (impl Read + Seek),
) -> Result<Option<(MessageHeader, usize)>, io::Error> {
let mut size = 0;
let marker = reader.read_u32::<LittleEndian>()?;
size += 4;

if marker != 0xFFFFFFFF {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down
55 changes: 34 additions & 21 deletions src/staging/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::Schema;
use chrono::{NaiveDateTime, Timelike, Utc};
use chrono::{DateTime, Datelike, NaiveDateTime, Timelike, Utc};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use parquet::{
Expand Down Expand Up @@ -141,8 +141,7 @@ impl<'a> Stream<'a> {
hostname.push_str(&INGESTOR_META.get_ingestor_id());
}
let filename = format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand All @@ -160,11 +159,11 @@ impl<'a> Stream<'a> {
return vec![];
};

let paths = dir
let paths: Vec<PathBuf> = dir
.flatten()
.map(|file| file.path())
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
.sorted_by_key(|f| f.metadata().unwrap().created().unwrap())
.collect();

paths
Expand All @@ -177,24 +176,32 @@ impl<'a> Stream<'a> {
/// Only includes ones starting from the previous minute
pub fn arrow_files_grouped_exclude_time(
&self,
exclude: NaiveDateTime,
shutdown_signal: bool,
) -> HashMap<PathBuf, Vec<PathBuf>> {
let now = Utc::now();

// Extract date and time components of current time
let now_date = (now.year(), now.month(), now.day());
let now_time = (now.hour(), now.minute());

let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
let mut arrow_files = self.arrow_files();

// if the shutdown signal is false i.e. normal condition
// don't keep the ones for the current minute
if !shutdown_signal {
arrow_files.retain(|path| {
!path
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
});
}
arrow_files.retain(|path| {
let created_at = path.metadata().unwrap().created().unwrap();
let created_at: DateTime<Utc> = created_at.into();
let created_date = (created_at.year(), created_at.month(), created_at.day());
let created_time = (created_at.hour(), created_at.minute());

let same_date = now_date == created_date;
let same_time = now_time == created_time;
// if the shutdown signal is false i.e. normal condition
// don't keep the ones for the current minute
if !shutdown_signal {
!same_date || !same_time
} else {
true
}
});

let random_string =
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15);
Expand Down Expand Up @@ -370,8 +377,7 @@ impl<'a> Stream<'a> {
) -> Result<Option<Schema>, StagingError> {
let mut schemas = Vec::new();

let time = chrono::Utc::now().naive_utc();
let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal);
let staging_files = self.arrow_files_grouped_exclude_time(shutdown_signal);
if staging_files.is_empty() {
metrics::STAGING_FILES
.with_label_values(&[&self.stream_name])
Expand Down Expand Up @@ -400,9 +406,12 @@ impl<'a> Stream<'a> {
}

let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
println!("Number of valid readers: {}", record_reader.readers.len());

if record_reader.readers.is_empty() {
continue;
}

let merged_schema = record_reader.merged_schema();

let props = parquet_writer_props(
Expand All @@ -422,9 +431,13 @@ impl<'a> Stream<'a> {
.open(&part_path)
.map_err(|_| StagingError::Create)?;
let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?;
let mut input_count = 0;
for ref record in record_reader.merged_iter(schema, time_partition.cloned()) {
let batch_size = record.num_rows();
writer.write(record)?;
input_count += batch_size;
}
println!("Total input count: {}", input_count);
writer.close()?;

if part_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 {
Expand Down
Loading