diff --git a/src/csharp/Microsoft.Spark.E2ETest.ExternalLibrary/ExternalClass.cs b/src/csharp/Microsoft.Spark.E2ETest.ExternalLibrary/ExternalClass.cs index db525172d..f49d0f06c 100644 --- a/src/csharp/Microsoft.Spark.E2ETest.ExternalLibrary/ExternalClass.cs +++ b/src/csharp/Microsoft.Spark.E2ETest.ExternalLibrary/ExternalClass.cs @@ -6,7 +6,6 @@ namespace Microsoft.Spark.E2ETest.ExternalLibrary { - [Serializable] public class ExternalClass { private string _s; diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs index e0443f04c..a51a4b8c9 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs @@ -6,7 +6,6 @@ namespace Microsoft.Spark.E2ETest.IpcTests { - [Serializable] public class TestBroadcastVariable { public int IntValue { get; private set; } diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs index 539b41ec3..4b137dc74 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs @@ -298,7 +298,6 @@ private void WriteCsv(int start, int count, string path) } } - [Serializable] private class TestForeachWriter : IForeachWriter { [NonSerialized] @@ -354,7 +353,6 @@ public virtual void Process(Row value) } } - [Serializable] private class TestForeachWriterOpenFailure : TestForeachWriter { public override bool Open(long partitionId, long epochId) @@ -364,7 +362,6 @@ public override bool Open(long partitionId, long epochId) } } - [Serializable] private class TestForeachWriterProcessFailure : TestForeachWriter { public override void Process(Row value) diff --git a/src/csharp/Microsoft.Spark.UnitTest/Microsoft.Spark.UnitTest.csproj b/src/csharp/Microsoft.Spark.UnitTest/Microsoft.Spark.UnitTest.csproj index d863334da..d50ee3271 100644 --- a/src/csharp/Microsoft.Spark.UnitTest/Microsoft.Spark.UnitTest.csproj +++ b/src/csharp/Microsoft.Spark.UnitTest/Microsoft.Spark.UnitTest.csproj @@ -7,7 +7,8 @@ - + + diff --git a/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs b/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs index b655fdf1b..56b08ab64 100644 --- a/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs +++ b/src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs @@ -5,20 +5,24 @@ using System; using System.IO; using System.Reflection; -using System.Runtime.Serialization.Formatters.Binary; using Microsoft.Spark.Utils; using Xunit; +using MessagePack; namespace Microsoft.Spark.UnitTest { [Collection("Spark Unit Tests")] public class UdfSerDeTests { - [Serializable] private class TestClass { private readonly string _str; + // TODO: find out why MessagePack is requiring a parameterless constructor. + public TestClass() + { + } + public TestClass(string s) { _str = s; @@ -149,16 +153,13 @@ private Delegate SerDe(Delegate udf) return Deserialize(Serialize(udf)); } -#pragma warning disable SYSLIB0011 // Type or member is obsolete - // TODO: Replace BinaryFormatter with a new, secure serializer. private byte[] Serialize(Delegate udf) { UdfSerDe.UdfData udfData = UdfSerDe.Serialize(udf); using (var ms = new MemoryStream()) { - var bf = new BinaryFormatter(); - bf.Serialize(ms, udfData); + MessagePackSerializer.Typeless.Serialize(ms, udfData); return ms.ToArray(); } } @@ -167,11 +168,9 @@ private Delegate Deserialize(byte[] serializedUdf) { using (var ms = new MemoryStream(serializedUdf, false)) { - var bf = new BinaryFormatter(); - UdfSerDe.UdfData udfData = (UdfSerDe.UdfData)bf.Deserialize(ms); + UdfSerDe.UdfData udfData = (UdfSerDe.UdfData)MessagePackSerializer.Typeless.Deserialize(ms); return UdfSerDe.Deserialize(udfData); } } -#pragma warning restore } } diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs index 7e8887975..1b904d8f1 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs @@ -8,7 +8,6 @@ using System.IO; using System.Linq; using System.Reflection; -using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using System.Threading.Tasks; using Apache.Arrow; @@ -21,6 +20,7 @@ using Razorvine.Pickle; using Xunit; using static Microsoft.Spark.UnitTest.TestUtils.ArrowTestUtils; +using MessagePack; namespace Microsoft.Spark.Worker.UnitTest { @@ -1050,7 +1050,6 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions) using var inputStream = new MemoryStream(); using var outputStream = new MemoryStream(); // Write test data to the input stream. - var formatter = new BinaryFormatter(); var memoryStream = new MemoryStream(); var inputs = new int[] { 0, 1, 2, 3, 4 }; @@ -1059,10 +1058,7 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions) foreach (int input in inputs) { memoryStream.Position = 0; -#pragma warning disable SYSLIB0011 // Type or member is obsolete - // TODO: Replace BinaryFormatter with a new, secure serializer. - formatter.Serialize(memoryStream, input); -#pragma warning restore SYSLIB0011 // Type or member is obsolete + MessagePackSerializer.Typeless.Serialize(memoryStream, input); values.Add(memoryStream.ToArray()); } @@ -1092,12 +1088,9 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions) for (int i = 0; i < inputs.Length; ++i) { Assert.True(SerDe.ReadInt32(outputStream) > 0); -#pragma warning disable SYSLIB0011 // Type or member is obsolete - // TODO: Replace BinaryFormatter with a new, secure serializer. Assert.Equal( mapUdf(i), - formatter.Deserialize(outputStream)); -#pragma warning restore SYSLIB0011 // Type or member is obsolete + MessagePackSerializer.Typeless.Deserialize(outputStream)); } // Validate all the data on the stream is read. diff --git a/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj b/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj index 9e06e5ca8..196e099a9 100644 --- a/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj +++ b/src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj @@ -4,6 +4,7 @@ + diff --git a/src/csharp/Microsoft.Spark.Worker/Command/RDDCommandExecutor.cs b/src/csharp/Microsoft.Spark.Worker/Command/RDDCommandExecutor.cs index 830903ea9..0a1661e98 100644 --- a/src/csharp/Microsoft.Spark.Worker/Command/RDDCommandExecutor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Command/RDDCommandExecutor.cs @@ -5,9 +5,9 @@ using System; using System.Collections.Generic; using System.IO; -using System.Runtime.Serialization.Formatters.Binary; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Utils; +using MessagePack; namespace Microsoft.Spark.Worker.Command { @@ -19,8 +19,6 @@ internal class RDDCommandExecutor { [ThreadStatic] private static MemoryStream s_writeOutputStream; - [ThreadStatic] - private static BinaryFormatter s_binaryFormatter; /// /// Executes the commands on the input data read from input stream @@ -111,11 +109,7 @@ private void Serialize( switch (serializerMode) { case CommandSerDe.SerializedMode.Byte: - BinaryFormatter formatter = s_binaryFormatter ??= new BinaryFormatter(); -#pragma warning disable SYSLIB0011 // Type or member is obsolete - // TODO: Replace BinaryFormatter with a new, secure serializer. - formatter.Serialize(stream, message); -#pragma warning restore SYSLIB0011 // Type or member is obsolete + MessagePackSerializer.Typeless.Serialize(stream, message); break; case CommandSerDe.SerializedMode.None: case CommandSerDe.SerializedMode.String: diff --git a/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj b/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj index cd5e6d0eb..1a682bac9 100644 --- a/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj +++ b/src/csharp/Microsoft.Spark.Worker/Microsoft.Spark.Worker.csproj @@ -11,7 +11,8 @@ - + + diff --git a/src/csharp/Microsoft.Spark.Worker/Processor/BroadcastVariableProcessor.cs b/src/csharp/Microsoft.Spark.Worker/Processor/BroadcastVariableProcessor.cs index 353358e44..c8e8f0d32 100644 --- a/src/csharp/Microsoft.Spark.Worker/Processor/BroadcastVariableProcessor.cs +++ b/src/csharp/Microsoft.Spark.Worker/Processor/BroadcastVariableProcessor.cs @@ -6,9 +6,9 @@ using System.Diagnostics; using System.IO; using System.Net; -using System.Runtime.Serialization.Formatters.Binary; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Network; +using MessagePack; namespace Microsoft.Spark.Worker.Processor { @@ -47,7 +47,6 @@ internal BroadcastVariables Process(Stream stream) } } - var formatter = new BinaryFormatter(); for (int i = 0; i < broadcastVars.Count; ++i) { long bid = SerDe.ReadInt64(stream); @@ -62,10 +61,7 @@ internal BroadcastVariables Process(Stream stream) $"server {readBid} is different from the Broadcast Id received " + $"from the payload {bid}."); } -#pragma warning disable SYSLIB0011 // Type or member is obsolete - // TODO: Replace BinaryFormatter with a new, secure serializer. - object value = formatter.Deserialize(socket.InputStream); -#pragma warning restore SYSLIB0011 // Type or member is obsolete + object value = MessagePackSerializer.Typeless.Deserialize(socket.InputStream); BroadcastRegistry.Add(bid, value); } else @@ -73,10 +69,7 @@ internal BroadcastVariables Process(Stream stream) string path = SerDe.ReadString(stream); using FileStream fStream = File.Open(path, FileMode.Open, FileAccess.Read, FileShare.Read); -#pragma warning disable SYSLIB0011 // Type or member is obsolete - // TODO: Replace BinaryFormatter with a new, secure serializer. - object value = formatter.Deserialize(fStream); -#pragma warning restore SYSLIB0011 // Type or member is obsolete + object value = MessagePackSerializer.Typeless.Deserialize(fStream); BroadcastRegistry.Add(bid, value); } } diff --git a/src/csharp/Microsoft.Spark/Broadcast.cs b/src/csharp/Microsoft.Spark/Broadcast.cs index c26ad5329..4bc634285 100644 --- a/src/csharp/Microsoft.Spark/Broadcast.cs +++ b/src/csharp/Microsoft.Spark/Broadcast.cs @@ -4,12 +4,12 @@ using System.IO; using System.Net; using System.Runtime.Serialization; -using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using Microsoft.Spark.Interop; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Network; using Microsoft.Spark.Services; +using MessagePack; namespace Microsoft.Spark { @@ -20,7 +20,6 @@ namespace Microsoft.Spark /// also attempts to distribute broadcast variables using efficient broadcast algorithms to /// reduce communication cost. /// - [Serializable] public sealed class Broadcast : IJvmObjectReferenceProvider { [NonSerialized] @@ -223,8 +222,7 @@ private void WriteToFile(object value) /// Stream to which the object is serialized private void Dump(object value, Stream stream) { - var formatter = new BinaryFormatter(); - formatter.Serialize(stream, value); + MessagePackSerializer.Typeless.Serialize(stream, value); } } diff --git a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj index 6bd14033e..9ad935bf3 100644 --- a/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj +++ b/src/csharp/Microsoft.Spark/Microsoft.Spark.csproj @@ -33,7 +33,8 @@ - + + diff --git a/src/csharp/Microsoft.Spark/RDD/Collector.cs b/src/csharp/Microsoft.Spark/RDD/Collector.cs index 9acee09bc..c6e003cd9 100644 --- a/src/csharp/Microsoft.Spark/RDD/Collector.cs +++ b/src/csharp/Microsoft.Spark/RDD/Collector.cs @@ -6,11 +6,11 @@ using System.Collections.Generic; using System.IO; using System.Linq; -using System.Runtime.Serialization.Formatters.Binary; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; using Microsoft.Spark.Utils; using static Microsoft.Spark.Utils.CommandSerDe; +using MessagePack; namespace Microsoft.Spark.RDD { @@ -66,15 +66,13 @@ internal interface IDeserializer } /// - /// Deserializer using the BinaryFormatter. + /// Deserializer using MessagePack. /// private sealed class BinaryDeserializer : IDeserializer { - private readonly BinaryFormatter _formater = new BinaryFormatter(); - public object Deserialize(Stream stream, int length) { - return _formater.Deserialize(stream); + return MessagePackSerializer.Typeless.Deserialize(stream); } } diff --git a/src/csharp/Microsoft.Spark/SparkContext.cs b/src/csharp/Microsoft.Spark/SparkContext.cs index 248005b65..48b110f5d 100644 --- a/src/csharp/Microsoft.Spark/SparkContext.cs +++ b/src/csharp/Microsoft.Spark/SparkContext.cs @@ -4,11 +4,11 @@ using System.Collections.Generic; using System.IO; -using System.Runtime.Serialization.Formatters.Binary; using Microsoft.Spark.Hadoop.Conf; using Microsoft.Spark.Interop.Internal.Scala; using Microsoft.Spark.Interop.Ipc; using static Microsoft.Spark.Utils.CommandSerDe; +using MessagePack; namespace Microsoft.Spark { @@ -225,13 +225,12 @@ public void ClearJobGroup() /// RDD representing distributed collection internal RDD Parallelize(IEnumerable seq, int? numSlices = null) { - var formatter = new BinaryFormatter(); using var memoryStream = new MemoryStream(); var values = new List(); foreach (T obj in seq) { - formatter.Serialize(memoryStream, obj); + MessagePackSerializer.Typeless.Serialize(memoryStream, obj); values.Add(memoryStream.ToArray()); memoryStream.SetLength(0); } diff --git a/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs b/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs index d2343fd29..14820dd2c 100644 --- a/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs @@ -8,10 +8,10 @@ using System.IO; using System.Linq; using System.Reflection; -using System.Runtime.Serialization.Formatters.Binary; using System.Text; using Microsoft.Spark.Interop.Ipc; using Microsoft.Spark.Sql; +using MessagePack; namespace Microsoft.Spark.Utils { @@ -46,7 +46,6 @@ internal enum SerializedMode /// * /// * /// - [Serializable] private sealed class UdfWrapperNode { /// @@ -86,7 +85,6 @@ private sealed class UdfWrapperNode /// [ UDF#1, UDF#2, UDF#3 ] /// /// - [Serializable] private sealed class UdfWrapperData { /// @@ -159,10 +157,9 @@ internal static byte[] Serialize( Udfs = udfs.ToArray() }; - var formatter = new BinaryFormatter(); using (var stream = new MemoryStream()) { - formatter.Serialize(stream, udfWrapperData); + MessagePackSerializer.Typeless.Serialize(stream, udfWrapperData); byte[] udfBytes = stream.ToArray(); byte[] udfBytesLengthAsBytes = BitConverter.GetBytes(udfBytes.Length); @@ -291,10 +288,9 @@ private static UdfWrapperData GetUdfWrapperDataFromStream( byte[] serializedCommand = SerDe.ReadBytes(stream); - var bf = new BinaryFormatter(); var ms = new MemoryStream(serializedCommand, false); - return (UdfWrapperData)bf.Deserialize(ms); + return (UdfWrapperData)MessagePackSerializer.Typeless.Deserialize(ms); } internal static T Deserialize( diff --git a/src/csharp/Microsoft.Spark/Utils/DependencyProviderUtils.cs b/src/csharp/Microsoft.Spark/Utils/DependencyProviderUtils.cs index 3954151d1..8c8f18e00 100644 --- a/src/csharp/Microsoft.Spark/Utils/DependencyProviderUtils.cs +++ b/src/csharp/Microsoft.Spark/Utils/DependencyProviderUtils.cs @@ -4,7 +4,7 @@ using System; using System.IO; -using System.Runtime.Serialization.Formatters.Binary; +using MessagePack; namespace Microsoft.Spark.Utils { @@ -26,7 +26,6 @@ internal static string[] GetMetadataFiles(string path) => internal static string CreateFileName(long number) => s_filePattern.Replace("*", $"{number:D19}"); - [Serializable] internal class NuGetMetadata { public string FileName { get; set; } @@ -53,7 +52,6 @@ private bool Equals(NuGetMetadata other) } } - [Serializable] internal class Metadata { public string[] AssemblyProbingPaths { get; set; } @@ -74,15 +72,13 @@ public override bool Equals(object obj) internal static Metadata Deserialize(string path) { using FileStream fileStream = File.OpenRead(path); - var formatter = new BinaryFormatter(); - return (Metadata)formatter.Deserialize(fileStream); + return MessagePackSerializer.Deserialize(fileStream, MessagePack.Resolvers.ContractlessStandardResolverAllowPrivate.Options); } internal void Serialize(string path) { using FileStream fileStream = File.OpenWrite(path); - var formatter = new BinaryFormatter(); - formatter.Serialize(fileStream, this); + MessagePackSerializer.Serialize(fileStream, this, MessagePack.Resolvers.ContractlessStandardResolverAllowPrivate.Options); } private bool Equals(Metadata other) diff --git a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs index 0d457188e..9e8944975 100644 --- a/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs +++ b/src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs @@ -21,7 +21,6 @@ internal class UdfSerDe private static readonly ConcurrentDictionary s_typeCache = new ConcurrentDictionary(); - [Serializable] internal sealed class TypeData : IEquatable { public string Name { get; set; } @@ -50,7 +49,6 @@ public bool Equals(TypeData other) } } - [Serializable] internal sealed class UdfData { public TypeData TypeData { get; set; } @@ -77,7 +75,6 @@ public bool Equals(UdfData other) } } - [Serializable] internal sealed class TargetData { public TypeData TypeData { get; set; } @@ -112,7 +109,6 @@ public bool Equals(TargetData other) } } - [Serializable] internal sealed class FieldData { public TypeData TypeData { get; set; }