From 6cd73d521c965bb784f48b1a4d63f51af7bc4862 Mon Sep 17 00:00:00 2001 From: JOBIN-SABU <85jobinsabu@gmail.com> Date: Thu, 6 Feb 2025 12:12:07 +0530 Subject: [PATCH 1/5] Fix issue #45394: Handle single-line JSON without line ending --- cpp/CMakeLists.txt | 43 ++++++++++++++++ cpp/src/arrow/dataset/file_json.cc | 43 +++++++++++++--- cpp/tests/CMakeLists.txt | 13 +++++ cpp/tests/test_file_json.cc | 81 ++++++++++++++++++++++++++++++ test.json | 1 + 5 files changed, 175 insertions(+), 6 deletions(-) create mode 100644 cpp/tests/CMakeLists.txt create mode 100644 cpp/tests/test_file_json.cc create mode 100644 test.json diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a7d80c2e96c23..5a6d7608fe549 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -760,3 +760,46 @@ config_summary_message() if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() + +# Project name +project(ArrowJson) + +# Add your source files and libraries +add_subdirectory(src) + +# Add Google Test +add_subdirectory(${CMAKE_SOURCE_DIR}/googletest) + +# Include directories +include_directories(${CMAKE_SOURCE_DIR}/src) + +# Add the test directory +add_subdirectory(tests) + +# Enable testing +enable_testing() + +# Add custom target for tags (if needed) +if(UNIX) + add_custom_target(tags + etags + --members + --declarations + `find + ${CMAKE_CURRENT_SOURCE_DIR}/src + -name + \\*.cc + -or + -name + \\*.hh + -or + -name + \\*.cpp + -or + -name + \\*.h + -or + -name + \\*.hpp`) +endif() + \ No newline at end of file diff --git a/cpp/src/arrow/dataset/file_json.cc b/cpp/src/arrow/dataset/file_json.cc index 1d545c3969f6a..2e2264af1fd81 100644 --- a/cpp/src/arrow/dataset/file_json.cc +++ b/cpp/src/arrow/dataset/file_json.cc @@ -18,6 +18,7 @@ #include "arrow/dataset/file_json.h" #include +#include #include #include @@ -108,16 +109,46 @@ class JsonFragmentScanner : public FragmentScanner { parse_options.unexpected_field_behavior = json::UnexpectedFieldBehavior::Ignore; int64_t block_size = format_options.read_options.block_size; + if (block_size <= 0) { + return Status::Invalid("Block size must be positive"); + } + auto num_batches = static_cast(bit_util::CeilDiv(inspected.num_bytes, block_size)); + if (num_batches < 0) { + return Status::Invalid("Number of batches calculation overflowed"); + } auto future = json::StreamingReader::MakeAsync( - inspected.stream, format_options.read_options, parse_options, - io::default_io_context(), cpu_executor); - return future.Then([num_batches, block_size](const ReaderPtr& reader) - -> Result> { - return std::make_shared(reader, num_batches, block_size); - }); + inspected.stream, format_options.read_options, parse_options, + io::default_io_context(), cpu_executor); + +// ✅ Fix: Handle Single-Line JSON Case +return future.Then([num_batches, block_size](const ReaderPtr& reader) + -> Result> { + if (!reader) { + return Status::Invalid("Failed to create JSON Streaming Reader."); + } + + // Check if the input stream has only one JSON object and no newline + // Check if the input stream has only one JSON object and no newline +auto stream_data = inspected.stream->Read(); +if (!stream_data.ok()) { + // Handle the error appropriately + return Status::IOError("Failed to read from the input stream"); +} + +std::string json_content = stream_data->ToString(); +if (!json_content.empty() && json_content.back() != '\n') { + json_content += '\n'; // Append a newline to fix the issue +} + +// Create a new InputStream with fixed content +inspected.stream = std::make_shared(Buffer::FromString(json_content)); + +return std::make_shared(reader, num_batches, block_size); +}); + } private: diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt new file mode 100644 index 0000000000000..bdb096dc60e8e --- /dev/null +++ b/cpp/tests/CMakeLists.txt @@ -0,0 +1,13 @@ +# Add test source files +set(TEST_SOURCES + test_file_json.cc +) + +# Create test executable +add_executable(ArrowJsonTests ${TEST_SOURCES}) + +# Link Google Test and your project libraries +target_link_libraries(ArrowJsonTests gtest gtest_main arrow_dataset arrow_io) + +# Add tests +add_test(NAME ArrowJsonTests COMMAND ArrowJsonTests) \ No newline at end of file diff --git a/cpp/tests/test_file_json.cc b/cpp/tests/test_file_json.cc new file mode 100644 index 0000000000000..83e6395ae60b6 --- /dev/null +++ b/cpp/tests/test_file_json.cc @@ -0,0 +1,81 @@ +#include +#include "arrow/dataset/file_json.h" +#include "arrow/io/memory.h" +#include "arrow/status.h" +#include "arrow/testing/gtest_util.h" + +using namespace arrow; +using namespace arrow::dataset; + +class JsonFragmentScannerTest : public ::testing::Test { + protected: + void SetUp() override { + // Set up necessary objects and state for the tests + } + + void TearDown() override { + // Clean up after tests + } +}; + +TEST_F(JsonFragmentScannerTest, InvalidBlockSize) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = -1; // Invalid block size + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_FALSE(result.ok()); + ASSERT_EQ(result.status().code(), StatusCode::Invalid); + ASSERT_EQ(result.status().message(), "Block size must be positive"); +} + +TEST_F(JsonFragmentScannerTest, ValidBlockSize) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = 1024; // Valid block size + inspected.num_bytes = 2048; + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_TRUE(result.ok()); +} + +TEST_F(JsonFragmentScannerTest, SingleLineJson) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = 1024; + inspected.num_bytes = 1024; + + // Create a single-line JSON input stream + std::string json_content = R"({"key": "value"})"; + inspected.stream = std::make_shared(json_content); + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_TRUE(result.ok()); +} + +TEST_F(JsonFragmentScannerTest, MultiLineJson) { + FragmentScanRequest scan_request; + JsonFragmentScanOptions format_options; + JsonInspectedFragment inspected; + Executor* cpu_executor = nullptr; + + format_options.read_options.block_size = 1024; + inspected.num_bytes = 2048; + + // Create a multi-line JSON input stream + std::string json_content = R"({"key1": "value1"} +{"key2": "value2"})"; + inspected.stream = std::make_shared(json_content); + + auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); + ASSERT_TRUE(result.ok()); +} \ No newline at end of file diff --git a/test.json b/test.json new file mode 100644 index 0000000000000..4d92cc4903e77 --- /dev/null +++ b/test.json @@ -0,0 +1 @@ +{"field": 1} \ No newline at end of file From f9b58502c66edddbc0b89fd76538e955a555d710 Mon Sep 17 00:00:00 2001 From: JOBIN-SABU <85jobinsabu@gmail.com> Date: Sat, 8 Feb 2025 09:46:47 +0530 Subject: [PATCH 2/5] Update file_json.cc --- cpp/src/arrow/dataset/file_json.cc | 54 +++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/dataset/file_json.cc b/cpp/src/arrow/dataset/file_json.cc index 1d545c3969f6a..7ab4314f7a04c 100644 --- a/cpp/src/arrow/dataset/file_json.cc +++ b/cpp/src/arrow/dataset/file_json.cc @@ -111,13 +111,32 @@ class JsonFragmentScanner : public FragmentScanner { auto num_batches = static_cast(bit_util::CeilDiv(inspected.num_bytes, block_size)); - auto future = json::StreamingReader::MakeAsync( - inspected.stream, format_options.read_options, parse_options, - io::default_io_context(), cpu_executor); - return future.Then([num_batches, block_size](const ReaderPtr& reader) - -> Result> { - return std::make_shared(reader, num_batches, block_size); - }); + auto future = json::( + inspected.stream, format_options.read_options, parse_options, + io::default_io_context(), cpu_executor); + +// ✅ Fix: Handle Single-Line JSON Case +return future.Then([num_batches, block_size](const ReaderPtr& reader) + -> Result> { + if (!reader) { + return Status::Invalid("Failed to create JSON Streaming Reader."); + } + + // Check if the input stream has only one JSON object and no newline + auto stream_data = inspected.stream->Read(); + if (stream_data.ok()) { + std::string json_content = stream_data->ToString(); + if (!json_content.empty() && json_content.back() != '\n') { + json_content += '\n'; // Append a newline to fix the issue + } + + // Create a new InputStream with fixed content + inspected.stream = std::make_shared(Buffer::FromString(json_content)); + } + + return std::make_shared(reader, num_batches, block_size); +}); + } private: @@ -298,8 +317,25 @@ Result> DoOpenReader( json::UnexpectedFieldBehavior::Ignore; } return json::StreamingReader::MakeAsync( - std::move(state->stream), state->read_options, state->parse_options); - }); + std::move(state->stream), state->read_options, state->parse_options) + .Then([](const ReaderPtr& reader) -> Result { + auto stream_data = reader->stream->Read(); + if (!stream_data.ok()) { + return Status::IOError("Failed to read from input stream"); + } + + std::string json_content = stream_data->ToString(); + if (!json_content.empty() && json_content.back() != '\n') { + json_content += '\n'; // Append a newline to fix the issue + } + + // Create a new InputStream with fixed content + reader->stream = std::make_shared(Buffer::FromString(json_content)); + + return reader; + }, [path = source.path()](const Status& error) -> Result { + return error.WithMessage("Could not open JSON input source '", path, "': ", error); + }); ARROW_ASSIGN_OR_RAISE(auto future, maybe_future); return future.Then([](const ReaderPtr& reader) -> Result { return reader; }, [path = source.path()](const Status& error) -> Result { From 665d39c3ffa8b096c1adf8c09e5f388fd01eb6ab Mon Sep 17 00:00:00 2001 From: "(jobin sabu)" <137516443+JOBIN-SABU@users.noreply.github.com> Date: Sat, 8 Feb 2025 13:25:32 +0530 Subject: [PATCH 3/5] Delete test.json --- test.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 test.json diff --git a/test.json b/test.json deleted file mode 100644 index 4d92cc4903e77..0000000000000 --- a/test.json +++ /dev/null @@ -1 +0,0 @@ -{"field": 1} \ No newline at end of file From 2c0ca0444ad12792ef84ee59346ad4d8d2c7e65d Mon Sep 17 00:00:00 2001 From: "(jobin sabu)" <137516443+JOBIN-SABU@users.noreply.github.com> Date: Sat, 8 Feb 2025 18:26:34 +0530 Subject: [PATCH 4/5] Update CMakeLists.txt --- cpp/CMakeLists.txt | 42 ------------------------------------------ 1 file changed, 42 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 5a6d7608fe549..ebe4a1e1c15e6 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -761,45 +761,3 @@ if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() -# Project name -project(ArrowJson) - -# Add your source files and libraries -add_subdirectory(src) - -# Add Google Test -add_subdirectory(${CMAKE_SOURCE_DIR}/googletest) - -# Include directories -include_directories(${CMAKE_SOURCE_DIR}/src) - -# Add the test directory -add_subdirectory(tests) - -# Enable testing -enable_testing() - -# Add custom target for tags (if needed) -if(UNIX) - add_custom_target(tags - etags - --members - --declarations - `find - ${CMAKE_CURRENT_SOURCE_DIR}/src - -name - \\*.cc - -or - -name - \\*.hh - -or - -name - \\*.cpp - -or - -name - \\*.h - -or - -name - \\*.hpp`) -endif() - \ No newline at end of file From bafd6b70398626df72211de2fd0eef84c83e934c Mon Sep 17 00:00:00 2001 From: JOBIN-SABU <85jobinsabu@gmail.com> Date: Wed, 12 Feb 2025 10:21:46 +0530 Subject: [PATCH 5/5] Partial fix for issue #45394 --- cpp/CMakeLists.txt | 42 ------------------- cpp/tests/CMakeLists.txt | 13 ------ cpp/tests/test_file_json.cc | 81 ------------------------------------- 3 files changed, 136 deletions(-) delete mode 100644 cpp/tests/CMakeLists.txt delete mode 100644 cpp/tests/test_file_json.cc diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 5a6d7608fe549..ebe4a1e1c15e6 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -761,45 +761,3 @@ if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() -# Project name -project(ArrowJson) - -# Add your source files and libraries -add_subdirectory(src) - -# Add Google Test -add_subdirectory(${CMAKE_SOURCE_DIR}/googletest) - -# Include directories -include_directories(${CMAKE_SOURCE_DIR}/src) - -# Add the test directory -add_subdirectory(tests) - -# Enable testing -enable_testing() - -# Add custom target for tags (if needed) -if(UNIX) - add_custom_target(tags - etags - --members - --declarations - `find - ${CMAKE_CURRENT_SOURCE_DIR}/src - -name - \\*.cc - -or - -name - \\*.hh - -or - -name - \\*.cpp - -or - -name - \\*.h - -or - -name - \\*.hpp`) -endif() - \ No newline at end of file diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt deleted file mode 100644 index bdb096dc60e8e..0000000000000 --- a/cpp/tests/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -# Add test source files -set(TEST_SOURCES - test_file_json.cc -) - -# Create test executable -add_executable(ArrowJsonTests ${TEST_SOURCES}) - -# Link Google Test and your project libraries -target_link_libraries(ArrowJsonTests gtest gtest_main arrow_dataset arrow_io) - -# Add tests -add_test(NAME ArrowJsonTests COMMAND ArrowJsonTests) \ No newline at end of file diff --git a/cpp/tests/test_file_json.cc b/cpp/tests/test_file_json.cc deleted file mode 100644 index 83e6395ae60b6..0000000000000 --- a/cpp/tests/test_file_json.cc +++ /dev/null @@ -1,81 +0,0 @@ -#include -#include "arrow/dataset/file_json.h" -#include "arrow/io/memory.h" -#include "arrow/status.h" -#include "arrow/testing/gtest_util.h" - -using namespace arrow; -using namespace arrow::dataset; - -class JsonFragmentScannerTest : public ::testing::Test { - protected: - void SetUp() override { - // Set up necessary objects and state for the tests - } - - void TearDown() override { - // Clean up after tests - } -}; - -TEST_F(JsonFragmentScannerTest, InvalidBlockSize) { - FragmentScanRequest scan_request; - JsonFragmentScanOptions format_options; - JsonInspectedFragment inspected; - Executor* cpu_executor = nullptr; - - format_options.read_options.block_size = -1; // Invalid block size - - auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); - ASSERT_FALSE(result.ok()); - ASSERT_EQ(result.status().code(), StatusCode::Invalid); - ASSERT_EQ(result.status().message(), "Block size must be positive"); -} - -TEST_F(JsonFragmentScannerTest, ValidBlockSize) { - FragmentScanRequest scan_request; - JsonFragmentScanOptions format_options; - JsonInspectedFragment inspected; - Executor* cpu_executor = nullptr; - - format_options.read_options.block_size = 1024; // Valid block size - inspected.num_bytes = 2048; - - auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); - ASSERT_TRUE(result.ok()); -} - -TEST_F(JsonFragmentScannerTest, SingleLineJson) { - FragmentScanRequest scan_request; - JsonFragmentScanOptions format_options; - JsonInspectedFragment inspected; - Executor* cpu_executor = nullptr; - - format_options.read_options.block_size = 1024; - inspected.num_bytes = 1024; - - // Create a single-line JSON input stream - std::string json_content = R"({"key": "value"})"; - inspected.stream = std::make_shared(json_content); - - auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); - ASSERT_TRUE(result.ok()); -} - -TEST_F(JsonFragmentScannerTest, MultiLineJson) { - FragmentScanRequest scan_request; - JsonFragmentScanOptions format_options; - JsonInspectedFragment inspected; - Executor* cpu_executor = nullptr; - - format_options.read_options.block_size = 1024; - inspected.num_bytes = 2048; - - // Create a multi-line JSON input stream - std::string json_content = R"({"key1": "value1"} -{"key2": "value2"})"; - inspected.stream = std::make_shared(json_content); - - auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor); - ASSERT_TRUE(result.ok()); -} \ No newline at end of file