From acde12c106af9901afd85b6f0c8f22f69535535c Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Fri, 10 Jan 2025 15:35:03 -0800 Subject: [PATCH] Extend streaming tests with coordinated commits (2/2) --- .../spark/sql/delta/DeltaSinkImplicitCastSuite.scala | 12 +++++++++++- .../org/apache/spark/sql/delta/DeltaSinkSuite.scala | 12 +++++++++++- .../spark/sql/delta/DeltaSourceTableAPISuite.scala | 12 +++++++++++- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala index 84c471740a4..0e73deefc99 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkImplicitCastSuite.scala @@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp} import scala.concurrent.duration._ +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.sources.{DeltaSink, DeltaSQLConf} import org.apache.spark.{SparkArithmeticException, SparkThrowable} @@ -119,7 +120,8 @@ abstract class DeltaSinkImplicitCastSuiteBase extends DeltaSinkTest { /** * Covers handling implicit casting to handle type mismatches when writing data to a Delta sink. */ -class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase { +class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase + with CoordinatedCommitsBaseSuite { import testImplicits._ test(s"write wider type - long -> int") { @@ -550,3 +552,11 @@ class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase { } } } + +class DeltaSinkImplicitCastWithCoordinatedCommitsBatch1Suite extends DeltaSinkImplicitCastSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1) +} + +class DeltaSinkImplicitCastWithCoordinatedCommitsBatch100Suite extends DeltaSinkImplicitCastSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala index 0a2826c0ded..efd25416de7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala @@ -21,6 +21,7 @@ import java.util.Locale // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.actions.CommitInfo +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.sources.{DeltaSink, DeltaSQLConf} import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} import org.apache.commons.io.FileUtils @@ -71,7 +72,8 @@ abstract class DeltaSinkTest class DeltaSinkSuite extends DeltaSinkTest - with DeltaColumnMappingTestUtils { + with DeltaColumnMappingTestUtils + with CoordinatedCommitsBaseSuite { import testImplicits._ @@ -617,6 +619,14 @@ class DeltaSinkSuite } +class DeltaSinkWithCoordinatedCommitsBatch1Suite extends DeltaSinkSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1) +} + +class DeltaSinkWithCoordinatedCommitsBatch100Suite extends DeltaSinkSuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +} + abstract class DeltaSinkColumnMappingSuiteBase extends DeltaSinkSuite with DeltaColumnMappingSelectedTestMixin { import testImplicits._ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceTableAPISuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceTableAPISuite.scala index 35b8a60151a..95a74f08d4e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceTableAPISuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceTableAPISuite.scala @@ -20,6 +20,7 @@ import java.io.File import scala.language.implicitConversions +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.{AnalysisException, Dataset} @@ -30,7 +31,8 @@ import org.apache.spark.sql.streaming.{StreamingQuery, StreamTest} import org.apache.spark.util.Utils class DeltaSourceTableAPISuite extends StreamTest - with DeltaSQLCommandTest { + with DeltaSQLCommandTest + with CoordinatedCommitsBaseSuite { override def beforeAll(): Unit = { super.beforeAll() @@ -250,3 +252,11 @@ class DeltaSourceTableAPISuite extends StreamTest } } } + +class DeltaSourceTableAPIWithCoordinatedCommitsBatch1Suite extends DeltaSourceTableAPISuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1) +} + +class DeltaSourceTableAPIWithCoordinatedCommitsBatch100Suite extends DeltaSourceTableAPISuite { + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +}