Skip to content

Commit

Permalink
Rework concurrency in test, use Arrow Future
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Dec 12, 2024
1 parent ec9b054 commit 6508c5a
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include <future>
#include <string_view>

#include "gtest/gtest.h"
Expand All @@ -33,6 +32,8 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/future.h"
#include "arrow/util/thread_pool.h"
#include "parquet/arrow/reader.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption_internal.h"
Expand Down Expand Up @@ -166,22 +167,35 @@ class DatasetEncryptionTestBase : public testing::TestWithParam<CompressionParam
// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());

std::vector<std::future<Result<std::shared_ptr<Table>>>> threads;
if (concurrently) {
// start with a single thread so we are more likely to build up a queue of jobs
ASSERT_OK_AND_ASSIGN(auto pool, arrow::internal::ThreadPool::Make(1));
std::vector<Future<std::shared_ptr<Table>>> threads;

// Read dataset above multiple times concurrently to see that is thread-safe.
for (size_t i = 0; i < 100; ++i) {
threads.push_back(
DeferNotOk(pool->Submit(DatasetEncryptionTestBase::read, dataset)));
}

// Read dataset above multiple times concurrently to see that is thread-safe.
// Reuse the dataset above to scan it twice to make sure decryption works correctly.
const size_t attempts = concurrently ? 1000 : 2;
for (size_t i = 0; i < attempts; ++i) {
if (concurrently) {
threads.push_back(std::async(DatasetEncryptionTestBase::read, dataset));
} else {
ASSERT_OK_AND_ASSIGN(auto read_table, read(dataset));
// ramp up parallelism
ASSERT_OK(pool->SetCapacity(16));
// ensure there are sufficient jobs to see concurrent processing
ASSERT_GT(pool->GetNumTasks(), 16);
printf("%d", pool->GetNumTasks());

// wait for all jobs to finish
pool->WaitForIdle();

// assert correctness of jobs
for (auto& thread : threads) {
ASSERT_OK_AND_ASSIGN(auto read_table, thread.result());
AssertTablesEqual(*read_table, *table_);
}
}
if (concurrently) {
for (auto& thread : threads) {
ASSERT_OK_AND_ASSIGN(auto read_table, thread.get());
} else {
// Reuse the dataset above to scan it twice to make sure decryption works correctly.
for (size_t i = 0; i < 2; ++i) {
ASSERT_OK_AND_ASSIGN(auto read_table, read(dataset));
AssertTablesEqual(*read_table, *table_);
}
}
Expand Down

0 comments on commit 6508c5a

Please sign in to comment.