diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a7d80c2e96c23..ebe4a1e1c15e6 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -760,3 +760,4 @@ config_summary_message() if(${ARROW_BUILD_CONFIG_SUMMARY_JSON}) config_summary_json() endif() + diff --git a/cpp/src/arrow/dataset/file_json.cc b/cpp/src/arrow/dataset/file_json.cc index 1d545c3969f6a..4ea18137cf5a8 100644 --- a/cpp/src/arrow/dataset/file_json.cc +++ b/cpp/src/arrow/dataset/file_json.cc @@ -18,6 +18,7 @@ #include "arrow/dataset/file_json.h" #include +#include #include #include @@ -108,16 +109,43 @@ class JsonFragmentScanner : public FragmentScanner { parse_options.unexpected_field_behavior = json::UnexpectedFieldBehavior::Ignore; int64_t block_size = format_options.read_options.block_size; + if (block_size <= 0) { + return Status::Invalid("Block size must be positive"); + } + auto num_batches = static_cast(bit_util::CeilDiv(inspected.num_bytes, block_size)); + if (num_batches < 0) { + return Status::Invalid("Number of batches calculation overflowed"); + } - auto future = json::StreamingReader::MakeAsync( - inspected.stream, format_options.read_options, parse_options, - io::default_io_context(), cpu_executor); - return future.Then([num_batches, block_size](const ReaderPtr& reader) - -> Result> { - return std::make_shared(reader, num_batches, block_size); - }); + + auto future = json::( + inspected.stream, format_options.read_options, parse_options, + io::default_io_context(), cpu_executor); + +// ✅ Fix: Handle Single-Line JSON Case +return future.Then([num_batches, block_size](const ReaderPtr& reader) + -> Result> { + if (!reader) { + return Status::Invalid("Failed to create JSON Streaming Reader."); + } + + // Check if the input stream has only one JSON object and no newline + + auto stream_data = inspected.stream->Read(); + if (stream_data.ok()) { + std::string json_content = stream_data->ToString(); + if (!json_content.empty() && json_content.back() != '\n') { + json_content += '\n'; // Append a newline to fix the issue + } + + // Create a new InputStream with fixed content + inspected.stream = std::make_shared(Buffer::FromString(json_content)); + } + + return std::make_shared(reader, num_batches, block_size); + // Check if the input stream has only one JSON object and no newline } private: @@ -298,8 +326,25 @@ Result> DoOpenReader( json::UnexpectedFieldBehavior::Ignore; } return json::StreamingReader::MakeAsync( - std::move(state->stream), state->read_options, state->parse_options); - }); + std::move(state->stream), state->read_options, state->parse_options) + .Then([](const ReaderPtr& reader) -> Result { + auto stream_data = reader->stream->Read(); + if (!stream_data.ok()) { + return Status::IOError("Failed to read from input stream"); + } + + std::string json_content = stream_data->ToString(); + if (!json_content.empty() && json_content.back() != '\n') { + json_content += '\n'; // Append a newline to fix the issue + } + + // Create a new InputStream with fixed content + reader->stream = std::make_shared(Buffer::FromString(json_content)); + + return reader; + }, [path = source.path()](const Status& error) -> Result { + return error.WithMessage("Could not open JSON input source '", path, "': ", error); + }); ARROW_ASSIGN_OR_RAISE(auto future, maybe_future); return future.Then([](const ReaderPtr& reader) -> Result { return reader; }, [path = source.path()](const Status& error) -> Result {