diff --git a/cpp/src/arrow/csv/parser_test.cc b/cpp/src/arrow/csv/parser_test.cc index 960a69c59db5d..dd3d025202018 100644 --- a/cpp/src/arrow/csv/parser_test.cc +++ b/cpp/src/arrow/csv/parser_test.cc @@ -175,6 +175,13 @@ void AssertParsePartial(BlockParser& parser, const std::string& str, ASSERT_EQ(parsed_size, expected_size); } +void AssertParsePartial(BlockParser& parser, const std::vector& data, + uint32_t expected_size) { + uint32_t parsed_size = static_cast(-1); + ASSERT_OK(parser.Parse(data, &parsed_size)); + ASSERT_EQ(parsed_size, expected_size); +} + void AssertLastRowEq(const BlockParser& parser, const std::vector& expected) { std::vector values; @@ -376,6 +383,21 @@ TEST(BlockParser, TruncatedData) { } } +TEST(BlockParser, TruncatedDataViews) { + // The BlockParser API mandates that, when passing a vector of views, + // only the last view may be a truncated CSV block. + // In the current implementation, receiving a truncated non-last view + // simply stops parsing after that view. + BlockParser parser(ParseOptions::Defaults(), /*num_cols=*/3); + AssertParsePartial(parser, Views({"a,b,", "c\n"}), 0); + AssertParsePartial(parser, Views({"a,b,c\nd,", "e,f\n"}), 6); + + // More sophisticated: non-last block ends on some newline inside a quoted string + // (terse reproducer of gh-39857) + AssertParsePartial(parser, Views({"a,b,\"c\n", "\"\n"}), 0); + AssertParsePartial(parser, Views({"a,b,c\n\"d\n", "\",e,f\n"}), 6); +} + TEST(BlockParser, Final) { // Tests for ParseFinal() BlockParser parser(ParseOptions::Defaults()); diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 332fad054fea3..1ac25e290a814 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -261,11 +261,10 @@ class SerialBlockReader : public BlockReader { auto consume_bytes = [this, bytes_before_buffer, next_buffer](int64_t nbytes) -> Status { DCHECK_GE(nbytes, 0); - auto offset = nbytes - bytes_before_buffer; - if (offset < 0) { - // Should not happen - return Status::Invalid("CSV parser got out of sync with chunker"); - } + int64_t offset = nbytes - bytes_before_buffer; + // All data before the buffer should have been consumed. + // This is checked in Parse() and BlockParsingOperator::operator(). + DCHECK_GE(offset, 0); partial_ = SliceBuffer(buffer_, offset); buffer_ = next_buffer; return Status::OK(); @@ -400,6 +399,7 @@ class BlockParsingOperator { count_rows_(first_row >= 0), num_rows_seen_(first_row) {} + // TODO: this is almost entirely the same as ReaderMixin::Parse(). Refactor? Result operator()(const CSVBlock& block) { constexpr int32_t max_num_rows = std::numeric_limits::max(); auto parser = std::make_shared( @@ -427,9 +427,24 @@ class BlockParsingOperator { } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } + + // `partial + completion` should have been entirely consumed. + const int64_t bytes_before_buffer = block.partial->size() + block.completion->size(); + if (static_cast(parsed_size) < bytes_before_buffer) { + // This can happen if `newlines_in_values` is not enabled and + // `partial + completion` ends with a newline inside a quoted string. + // In this case, the BlockParser stops at the truncated data in the first + // block (see gh-39857). + return Status::Invalid( + "CSV parser got out of sync with chunker. This can mean the data file " + "contains cell values spanning multiple lines; please consider enabling " + "the option 'newlines_in_values'."); + } + if (count_rows_) { num_rows_seen_ += parser->total_num_rows(); } + RETURN_NOT_OK(block.consume_bytes(parsed_size)); return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; @@ -738,6 +753,15 @@ class ReaderMixin { } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } + // See BlockParsingOperator for explanation. + const int64_t bytes_before_buffer = partial->size() + completion->size(); + if (static_cast(parsed_size) < bytes_before_buffer) { + return Status::Invalid( + "CSV parser got out of sync with chunker. This can mean the data file " + "contains cell values spanning multiple lines; please consider enabling " + "the option 'newlines_in_values'."); + } + if (count_rows_) { num_rows_seen_ += parser->total_num_rows(); } diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 31f24187e3b37..bc1dd8a09a768 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -667,6 +667,31 @@ def row_num(x): 'b': ["e", "j"], } + def test_chunker_out_of_sync(self): + # GH-39892: if there are newlines in values, the parser may become + # out of sync with the chunker. In this case, we try to produce an + # informative error message. + rows = b"""a,b,c\nd,e,"f\n"\ng,h,i\n""" + expected = { + 'a': ["d", "g"], + 'b': ["e", "h"], + 'c': ["f\n", "i"], + } + for block_size in range(8, 15): + # Sanity check: parsing works with newlines_in_values=True + d = self.read_bytes( + rows, parse_options=ParseOptions(newlines_in_values=True), + read_options=ReadOptions(block_size=block_size)).to_pydict() + assert d == expected + # With these block sizes, a block would end on the physical newline + # inside the quoted cell value, leading to a mismatch between + # CSV chunker and parser. + for block_size in range(8, 11): + with pytest.raises(ValueError, + match="cell values spanning multiple lines"): + self.read_bytes( + rows, read_options=ReadOptions(block_size=block_size)) + class BaseCSVTableRead(BaseTestCSV):