From a6e577d031d20a1a7d3dd01536b9a77db5d1bff8 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 6 Feb 2024 16:19:03 +0100 Subject: [PATCH] GH-39857: [C++] Improve error message for "chunker out of sync" condition (#39892) ### Rationale for this change When writing the CSV reader, we thought that the parser not finding the same line limits as the chunker should never happen, hence the terse "chunker out of sync" error message. It turns out that, if the input contains multiline cell values and the `newlines_in_values` option was not enabled, the chunker can happily delimit a block on a newline that's inside a quoted string. The parser will then see truncated data and will stop parsing, yielding a parsed size that's smaller than the first block (see added comment in the code). ### What changes are included in this PR? * Add some parser tests that showcase the condition encountered in GH-39857 * Improve error message to guide users towards the solution ### Are these changes tested? There's no functional change, the error message itself isn't tested. ### Are there any user-facing changes? No. * Closes: #39857 Authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/arrow/csv/parser_test.cc | 22 +++++++++++++++++++++ cpp/src/arrow/csv/reader.cc | 34 +++++++++++++++++++++++++++----- python/pyarrow/tests/test_csv.py | 25 +++++++++++++++++++++++ 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/csv/parser_test.cc b/cpp/src/arrow/csv/parser_test.cc index 960a69c59db5d..dd3d025202018 100644 --- a/cpp/src/arrow/csv/parser_test.cc +++ b/cpp/src/arrow/csv/parser_test.cc @@ -175,6 +175,13 @@ void AssertParsePartial(BlockParser& parser, const std::string& str, ASSERT_EQ(parsed_size, expected_size); } +void AssertParsePartial(BlockParser& parser, const std::vector& data, + uint32_t expected_size) { + uint32_t parsed_size = static_cast(-1); + ASSERT_OK(parser.Parse(data, &parsed_size)); + ASSERT_EQ(parsed_size, expected_size); +} + void AssertLastRowEq(const BlockParser& parser, const std::vector& expected) { std::vector values; @@ -376,6 +383,21 @@ TEST(BlockParser, TruncatedData) { } } +TEST(BlockParser, TruncatedDataViews) { + // The BlockParser API mandates that, when passing a vector of views, + // only the last view may be a truncated CSV block. + // In the current implementation, receiving a truncated non-last view + // simply stops parsing after that view. + BlockParser parser(ParseOptions::Defaults(), /*num_cols=*/3); + AssertParsePartial(parser, Views({"a,b,", "c\n"}), 0); + AssertParsePartial(parser, Views({"a,b,c\nd,", "e,f\n"}), 6); + + // More sophisticated: non-last block ends on some newline inside a quoted string + // (terse reproducer of gh-39857) + AssertParsePartial(parser, Views({"a,b,\"c\n", "\"\n"}), 0); + AssertParsePartial(parser, Views({"a,b,c\n\"d\n", "\",e,f\n"}), 6); +} + TEST(BlockParser, Final) { // Tests for ParseFinal() BlockParser parser(ParseOptions::Defaults()); diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 332fad054fea3..1ac25e290a814 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -261,11 +261,10 @@ class SerialBlockReader : public BlockReader { auto consume_bytes = [this, bytes_before_buffer, next_buffer](int64_t nbytes) -> Status { DCHECK_GE(nbytes, 0); - auto offset = nbytes - bytes_before_buffer; - if (offset < 0) { - // Should not happen - return Status::Invalid("CSV parser got out of sync with chunker"); - } + int64_t offset = nbytes - bytes_before_buffer; + // All data before the buffer should have been consumed. + // This is checked in Parse() and BlockParsingOperator::operator(). + DCHECK_GE(offset, 0); partial_ = SliceBuffer(buffer_, offset); buffer_ = next_buffer; return Status::OK(); @@ -400,6 +399,7 @@ class BlockParsingOperator { count_rows_(first_row >= 0), num_rows_seen_(first_row) {} + // TODO: this is almost entirely the same as ReaderMixin::Parse(). Refactor? Result operator()(const CSVBlock& block) { constexpr int32_t max_num_rows = std::numeric_limits::max(); auto parser = std::make_shared( @@ -427,9 +427,24 @@ class BlockParsingOperator { } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } + + // `partial + completion` should have been entirely consumed. + const int64_t bytes_before_buffer = block.partial->size() + block.completion->size(); + if (static_cast(parsed_size) < bytes_before_buffer) { + // This can happen if `newlines_in_values` is not enabled and + // `partial + completion` ends with a newline inside a quoted string. + // In this case, the BlockParser stops at the truncated data in the first + // block (see gh-39857). + return Status::Invalid( + "CSV parser got out of sync with chunker. This can mean the data file " + "contains cell values spanning multiple lines; please consider enabling " + "the option 'newlines_in_values'."); + } + if (count_rows_) { num_rows_seen_ += parser->total_num_rows(); } + RETURN_NOT_OK(block.consume_bytes(parsed_size)); return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; @@ -738,6 +753,15 @@ class ReaderMixin { } else { RETURN_NOT_OK(parser->Parse(views, &parsed_size)); } + // See BlockParsingOperator for explanation. + const int64_t bytes_before_buffer = partial->size() + completion->size(); + if (static_cast(parsed_size) < bytes_before_buffer) { + return Status::Invalid( + "CSV parser got out of sync with chunker. This can mean the data file " + "contains cell values spanning multiple lines; please consider enabling " + "the option 'newlines_in_values'."); + } + if (count_rows_) { num_rows_seen_ += parser->total_num_rows(); } diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 31f24187e3b37..bc1dd8a09a768 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -667,6 +667,31 @@ def row_num(x): 'b': ["e", "j"], } + def test_chunker_out_of_sync(self): + # GH-39892: if there are newlines in values, the parser may become + # out of sync with the chunker. In this case, we try to produce an + # informative error message. + rows = b"""a,b,c\nd,e,"f\n"\ng,h,i\n""" + expected = { + 'a': ["d", "g"], + 'b': ["e", "h"], + 'c': ["f\n", "i"], + } + for block_size in range(8, 15): + # Sanity check: parsing works with newlines_in_values=True + d = self.read_bytes( + rows, parse_options=ParseOptions(newlines_in_values=True), + read_options=ReadOptions(block_size=block_size)).to_pydict() + assert d == expected + # With these block sizes, a block would end on the physical newline + # inside the quoted cell value, leading to a mismatch between + # CSV chunker and parser. + for block_size in range(8, 11): + with pytest.raises(ValueError, + match="cell values spanning multiple lines"): + self.read_bytes( + rows, read_options=ReadOptions(block_size=block_size)) + class BaseCSVTableRead(BaseTestCSV):