Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: arrow_path_to_parquet #1239

Merged
merged 10 commits into from
Mar 24, 2025
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/main' into regex
de-sh committed Mar 21, 2025
commit 1d817623691a3e8bbd5a9f14d4108328e9473a8a
111 changes: 110 additions & 1 deletion src/parseable/streams.rs
Original file line number Diff line number Diff line change
@@ -836,7 +836,7 @@ impl Streams {

#[cfg(test)]
mod tests {
use std::{io::Write, time::Duration};
use std::{io::Write, sync::Barrier, thread::spawn, time::Duration};

use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray};
use arrow_schema::{DataType, Field, TimeUnit};
@@ -1301,4 +1301,113 @@ mod tests {

assert!(result.is_none());
}

#[test]
fn get_or_create_returns_existing_stream() {
let streams = Streams::default();
let options = Arc::new(Options::default());
let stream_name = "test_stream";
let metadata = LogStreamMetadata::default();
let ingestor_id = Some("test_ingestor".to_owned());

// Create the stream first
let stream1 = streams.get_or_create(
options.clone(),
stream_name.to_owned(),
metadata.clone(),
ingestor_id.clone(),
);

// Call get_or_create again with the same stream_name
let stream2 = streams.get_or_create(
options.clone(),
stream_name.to_owned(),
metadata.clone(),
ingestor_id.clone(),
);

// Assert that both references point to the same stream
assert!(Arc::ptr_eq(&stream1, &stream2));

// Verify the map contains only one entry
let guard = streams.read().expect("Failed to acquire read lock");
assert_eq!(guard.len(), 1);
}

#[test]
fn create_and_return_new_stream_when_name_does_not_exist() {
let streams = Streams::default();
let options = Arc::new(Options::default());
let stream_name = "new_stream";
let metadata = LogStreamMetadata::default();
let ingestor_id = Some("new_ingestor".to_owned());

// Assert the stream doesn't exist already
let guard = streams.read().expect("Failed to acquire read lock");
assert_eq!(guard.len(), 0);
assert!(!guard.contains_key(stream_name));
drop(guard);

// Call get_or_create with a new stream_name
let stream = streams.get_or_create(
options.clone(),
stream_name.to_owned(),
metadata.clone(),
ingestor_id.clone(),
);

// verify created stream has the same ingestor_id
assert_eq!(stream.ingestor_id, ingestor_id);

// Assert that the stream is created
let guard = streams.read().expect("Failed to acquire read lock");
assert_eq!(guard.len(), 1);
assert!(guard.contains_key(stream_name));
}

#[test]
fn get_or_create_stream_concurrently() {
let streams = Arc::new(Streams::default());
let options = Arc::new(Options::default());
let stream_name = String::from("concurrent_stream");
let metadata = LogStreamMetadata::default();
let ingestor_id = Some(String::from("concurrent_ingestor"));

// Barrier to synchronize threads
let barrier = Arc::new(Barrier::new(2));

// Clones for the first thread
let streams1 = Arc::clone(&streams);
let options1 = Arc::clone(&options);
let barrier1 = Arc::clone(&barrier);
let stream_name1 = stream_name.clone();
let metadata1 = metadata.clone();
let ingestor_id1 = ingestor_id.clone();

// First thread
let handle1 = spawn(move || {
barrier1.wait();
streams1.get_or_create(options1, stream_name1, metadata1, ingestor_id1)
});

// Cloned for the second thread
let streams2 = Arc::clone(&streams);

// Second thread
let handle2 = spawn(move || {
barrier.wait();
streams2.get_or_create(options, stream_name, metadata, ingestor_id)
});

// Wait for both threads to complete and get their results
let stream1 = handle1.join().expect("Thread 1 panicked");
let stream2 = handle2.join().expect("Thread 2 panicked");

// Assert that both references point to the same stream
assert!(Arc::ptr_eq(&stream1, &stream2));

// Verify the map contains only one entry
let guard = streams.read().expect("Failed to acquire read lock");
assert_eq!(guard.len(), 1);
}
}
You are viewing a condensed version of this merge commit. You can view the full changes here.