Skip to content

Commit

Permalink
Extend streaming tests with coordinated commits (2/2)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Jan 10, 2025
1 parent a920885 commit acde12c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}

import scala.concurrent.duration._

import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.sources.{DeltaSink, DeltaSQLConf}

import org.apache.spark.{SparkArithmeticException, SparkThrowable}
Expand Down Expand Up @@ -119,7 +120,8 @@ abstract class DeltaSinkImplicitCastSuiteBase extends DeltaSinkTest {
/**
* Covers handling implicit casting to handle type mismatches when writing data to a Delta sink.
*/
class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase {
class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase
with CoordinatedCommitsBaseSuite {
import testImplicits._

test(s"write wider type - long -> int") {
Expand Down Expand Up @@ -550,3 +552,11 @@ class DeltaSinkImplicitCastSuite extends DeltaSinkImplicitCastSuiteBase {
}
}
}

class DeltaSinkImplicitCastWithCoordinatedCommitsBatch1Suite extends DeltaSinkImplicitCastSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
}

class DeltaSinkImplicitCastWithCoordinatedCommitsBatch100Suite extends DeltaSinkImplicitCastSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.CommitInfo
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.sources.{DeltaSink, DeltaSQLConf}
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.apache.commons.io.FileUtils
Expand Down Expand Up @@ -71,7 +72,8 @@ abstract class DeltaSinkTest

class DeltaSinkSuite
extends DeltaSinkTest
with DeltaColumnMappingTestUtils {
with DeltaColumnMappingTestUtils
with CoordinatedCommitsBaseSuite {

import testImplicits._

Expand Down Expand Up @@ -617,6 +619,14 @@ class DeltaSinkSuite

}

class DeltaSinkWithCoordinatedCommitsBatch1Suite extends DeltaSinkSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
}

class DeltaSinkWithCoordinatedCommitsBatch100Suite extends DeltaSinkSuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
}

abstract class DeltaSinkColumnMappingSuiteBase extends DeltaSinkSuite
with DeltaColumnMappingSelectedTestMixin {
import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.File

import scala.language.implicitConversions

import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.sql.{AnalysisException, Dataset}
Expand All @@ -30,7 +31,8 @@ import org.apache.spark.sql.streaming.{StreamingQuery, StreamTest}
import org.apache.spark.util.Utils

class DeltaSourceTableAPISuite extends StreamTest
with DeltaSQLCommandTest {
with DeltaSQLCommandTest
with CoordinatedCommitsBaseSuite {

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -250,3 +252,11 @@ class DeltaSourceTableAPISuite extends StreamTest
}
}
}

class DeltaSourceTableAPIWithCoordinatedCommitsBatch1Suite extends DeltaSourceTableAPISuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
}

class DeltaSourceTableAPIWithCoordinatedCommitsBatch100Suite extends DeltaSourceTableAPISuite {
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
}

0 comments on commit acde12c

Please sign in to comment.