Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5.3 support #1178

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public DeltaFixture()
(3, 3, 2) => "delta-core_2.12:2.3.0",
(3, 3, 3) => "delta-core_2.12:2.3.0",
(3, 3, 4) => "delta-core_2.12:2.3.0",
(3, 5, _) => "delta-spark_2.12:3.2.0",
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
};

Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static IEnumerable<object[]> VersionData() =>
new object[] { Versions.V3_0_0 },
new object[] { Versions.V3_2_0 },
new object[] { Versions.V3_3_0 },
new object[] { Versions.V3_5_1 },
};

internal static Payload GetDefaultPayload()
Expand Down
22 changes: 10 additions & 12 deletions src/csharp/Microsoft.Spark.Worker/Processor/TaskContextProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,32 @@ internal TaskContext Process(Stream stream)
private static TaskContext ReadTaskContext_2_x(Stream stream)
=> new()
{
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
};

// Needed for 3.3.0+
// https://issues.apache.org/jira/browse/SPARK-36173
private static TaskContext ReadTaskContext_3_3(Stream stream)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if ReadTaskContext_3_3 can rely on ReadTaskContext_2_x.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it would be nice to reuse common logic, let's have a look at it after removing support for the obsolete spark versions

=> new()
{
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
// CPUs field is added into TaskContext from 3.3.0 https://issues.apache.org/jira/browse/SPARK-36173
CPUs = SerDe.ReadInt32(stream)
};

private static void ReadBarrierInfo(Stream stream)
{
// Read barrier-related payload. Note that barrier is currently not supported.
SerDe.ReadBool(stream); // IsBarrier
SerDe.ReadInt32(stream); // BoundPort
SerDe.ReadString(stream); // Secret
}

private static void ReadTaskContextProperties(Stream stream, TaskContext taskContext)
{
int numProperties = SerDe.ReadInt32(stream);
Expand Down Expand Up @@ -87,7 +88,6 @@ private static class TaskContextProcessorV2_4_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextProperties(stream, taskContext);

Expand All @@ -99,7 +99,6 @@ private static class TaskContextProcessorV3_0_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);
Expand All @@ -112,7 +111,6 @@ private static class TaskContextProcessorV3_3_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_3_3(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark/Versions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ internal static class Versions
internal const string V3_1_1 = "3.1.1";
internal const string V3_2_0 = "3.2.0";
internal const string V3_3_0 = "3.3.0";
internal const string V3_5_1 = "3.5.1";
}
}
6 changes: 3 additions & 3 deletions src/scala/microsoft-spark-3-5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
<artifactId>microsoft-spark</artifactId>
<version>${microsoft-spark.version}</version>
</parent>
<artifactId>microsoft-spark-3-2_2.12</artifactId>
<artifactId>microsoft-spark-3-5_2.12</artifactId>
<inceptionYear>2019</inceptionYear>
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.12.10</scala.version>
<scala.version>2.12.18</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.2.0</spark.version>
<spark.version>3.5.1</spark.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import scala.util.Try
*/
object DotnetRunner extends Logging {
private val DEBUG_PORT = 5567
private val supportedSparkMajorMinorVersionPrefix = "3.2"
private val supportedSparkVersions = Set[String]("3.2.0", "3.2.1", "3.2.2", "3.2.3")
private val supportedSparkMajorMinorVersionPrefix = "3.5"
private val supportedSparkVersions = Set[String]("3.5.0", "3.5.1", "3.5.2", "3.5.3")

val SPARK_VERSION = DotnetUtils.normalizeSparkVersion(spark.SPARK_VERSION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.apache.spark.sql.api.dotnet

import java.util.{List => JList, Map => JMap}

import org.apache.spark.api.python.{PythonAccumulatorV2, PythonBroadcast, PythonFunction}
import org.apache.spark.api.python.{PythonAccumulatorV2, PythonBroadcast, PythonFunction, SimplePythonFunction}
import org.apache.spark.broadcast.Broadcast

object SQLUtils {
Expand All @@ -24,8 +24,8 @@ object SQLUtils {
pythonVersion: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: PythonAccumulatorV2): PythonFunction = {

PythonFunction(
// From 3.4.0 use SimplePythonFunction. https://github.com/apache/spark/commit/18ff15729268def5ee1bdf5dfcb766bd1d699684
SimplePythonFunction(
command,
envVars,
pythonIncludes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class UtilsTest {

@Test
def shouldIgnorePatchVersion(): Unit = {
val sparkVersion = "3.2.1"
val sparkMajorMinorVersionPrefix = "3.2"
val supportedSparkVersions = Set[String]("3.2.0")
val sparkVersion = "3.5.1"
val sparkMajorMinorVersionPrefix = "3.5"
val supportedSparkVersions = Set[String]("3.5.0")

Utils.validateSparkVersions(
true,
Expand All @@ -30,10 +30,10 @@ class UtilsTest {

@Test
def shouldThrowForUnsupportedVersion(): Unit = {
val sparkVersion = "3.2.1"
val sparkVersion = "3.5.1"
val normalizedSparkVersion = Utils.normalizeSparkVersion(sparkVersion)
val sparkMajorMinorVersionPrefix = "3.2"
val supportedSparkVersions = Set[String]("3.2.0")
val sparkMajorMinorVersionPrefix = "3.5"
val supportedSparkVersions = Set[String]("3.5.0")

val exception = assertThrows(
classOf[IllegalArgumentException],
Expand All @@ -55,10 +55,10 @@ class UtilsTest {

@Test
def shouldThrowForUnsupportedMajorMinorVersion(): Unit = {
val sparkVersion = "2.4.4"
val sparkVersion = "3.3.0"
val normalizedSparkVersion = Utils.normalizeSparkVersion(sparkVersion)
val sparkMajorMinorVersionPrefix = "3.2"
val supportedSparkVersions = Set[String]("3.2.0")
val sparkMajorMinorVersionPrefix = "3.5"
val supportedSparkVersions = Set[String]("3.5.0")

val exception = assertThrows(
classOf[IllegalArgumentException],
Expand Down
1 change: 1 addition & 0 deletions src/scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<module>microsoft-spark-3-1</module>
<module>microsoft-spark-3-2</module>
<module>microsoft-spark-3-3</module>
<module>microsoft-spark-3-5</module>
</modules>

<pluginRepositories>
Expand Down