Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor serialization logic and remove JsonSerializer.Deserialize use for entire envelope #192

Merged
merged 5 commits into from
Mar 26, 2025

Conversation

GarrettBeatty
Copy link
Contributor

@GarrettBeatty GarrettBeatty commented Mar 19, 2025

Issue #, if available: DOTNET-7873

Description of changes:
The main change is that we no longer use JsonSerializer.Deserialize on the whole sqs.Body. This was parsing both the envelope (Which contains metadata fields and things) and also the data field inside. Previously this was all fine because were always storing data as an encoded json string. However, since I am working on changing that behavior in the future to store data as the actual json object, we can no longer use JsonSerializer.Deserialize on the whole body (because JsonSerializer.Deserialize fails). Instead we have to do:

  1. Parse outer wrapper using JsonDocument.Parse. This will parse everything except the data field.
  2. Deserialize data using _messageSerializer.Deserialize.

This way it allows users to implement and kind of message serializer logic for the data field. It could be json, xml, etc

Other changes

  1. Moved parsing logic into its own class (e.g. SQSParser, SNSParser, EventBridgeParser.

I have also validated that there are no trim warnings and i created the below console app to to verify it works.

Testing

  1. Unit and integration tests pass
  2. I made a console app to verify it still works
using AWS.Messaging;
using MessagingLocalAOTTest;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

var builder = new HostApplicationBuilder();

var queueUrl = "url";
builder.Services.AddAWSMessageBus(MessageFunctionJsonSerializerContext.Default,builder =>
{
    builder.AddSQSPublisher<ProductDTO>(queueUrl);
    builder.AddMessageHandler<ProductHandler, ProductDTO>();

    builder.AddSQSPoller(queueUrl);
});

var app = builder.Build();

var task = app.RunAsync();


var publisher = app.Services.GetRequiredService<IMessagePublisher>();

var product = new ProductDTO("1", "Foo", "Bar");

await publisher.PublishAsync(product);

var cancelSource = new CancellationTokenSource();
cancelSource.CancelAfter(5000);

try
{
    Task.WaitAll(new Task[] { task }, cancelSource.Token);
    Console.WriteLine("Application completed");
}
catch (OperationCanceledException) { }

using System.Text.Json.Serialization;

namespace MessagingLocalAOTTest;

public record ProductDTO(string ID, string Name, string Description);

[JsonSerializable(typeof(ProductDTO))]
[JsonSourceGenerationOptions(UseStringEnumConverter = true)]
public partial class MessageFunctionJsonSerializerContext : JsonSerializerContext
{
}
using AWS.Messaging;

namespace MessagingLocalAOTTest;

public class ProductHandler : IMessageHandler<ProductDTO>
{
    public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ProductDTO> messageEnvelope, CancellationToken token = default)
    {
        Console.WriteLine($"Saving product: {messageEnvelope.Message.Name}");
        return Task.FromResult(MessageProcessStatus.Success());
    }
}
PS C:\Users\gcbeatty\RiderProjects\ConsoleApp1> dotnet publish -r win-x64 -c Release
Restore complete (0.9s)
  ConsoleApp1 succeeded (21.6s) → ConsoleApp1\bin\Release\net9.0\win-x64\publish\

Build succeeded in 22.9s

PS C:\Users\gcbeatty\RiderProjects\ConsoleApp1> .\ConsoleApp1\bin\Release\net9.0\win-x64\publish\ConsoleApp1.exe
info: AWSSDK[0]   
      Found AWS options in IConfiguration
info: AWSSDK[0]
      Found credentials using the AWS SDK's default credential search
info: AWS.Messaging.Services.MessagePumpService[0]
      Starting polling: https://sqs.us-east-1.amazonaws.com/147997163238/newqueue
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
      Content root path: C:\Users\gcbeatty\RiderProjects\ConsoleApp1
Saving product: Foo

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@GarrettBeatty GarrettBeatty changed the title Refactorrebase Refactor serialization logic and remove JsonSerializer.Deserialize use for entire envelope Mar 19, 2025
@GarrettBeatty GarrettBeatty marked this pull request as ready for review March 19, 2025 14:29
rootCopy = document.RootElement.Clone();
}

var parsers = new IMessageParser[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of recreating this array of stateless parsers for every message should we make the array a static field of the EnvelopeSerializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

var (envelopeJson, metadata) = await ParseOuterWrapper(sqsMessage);

// Parse just the type field first to get the correct mapping
var messageType = GetMessageTypeFromEnvelope(envelopeJson);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetMessageTypeFromEnvelope is doing a full JSON document parse to just get the type and then we look up the mapping. Can't we put this logic in the DeserializeEnvelope which also does a JSON parse. We wouldn't be passing in the message type and mapping into DeserializeEnvelope it would determine those values after it does a parse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ive updated the code to remove GetMessageTypeFromEnvelope and just figure out the type in the same method as deserializeenvelope.

{
envelopeConfiguration.SNSMetadata.MessageAttributes = messageAttributes.Deserialize(MessagingJsonSerializerContext.Default.DictionarySNSMessageAttributeValue);
}
using var doc = JsonDocument.Parse(json);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above how we should avoid doing an additional parse of the document. I think we can get rid of this method and collapse this logic inside the deserialize envelope method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

AWSAccount = GetJsonPropertyAsString(root, "account"),
AWSRegion = GetJsonPropertyAsString(root, "region"),
Resources = GetJsonPropertyAsList<string>(root, "resources")
rootCopy = document.RootElement.Clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to clone the element?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i used clone because the variable was out of scope because i was doing using when parsing. But i guess i can just remove the using part so it doesnt get disposed

Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i double checked on this. since JsonDocument.parse implements IDisposable we have to use using which requires us to clone things. the other alternative is to use jsonnode which doesnt implement IDisposable and we wouldnt have to clone things

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now i kept it as-is since i would need to update everywhere to use node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages could be quite large so cloning the whole message is something we really shouldn't do. I understand you need to dispose the json document but you don't need to dispose of till the end of the method. What about something like this:

var document = JsonDocument.Parse(sqsMessage.Body);
try
{
    string currentMessageBody = sqsMessage.Body;
    var combinedMetadata = new MessageMetadata();

    // Try each parser in order
    foreach (var parser in _parsers.Where(p => p.CanParse(document.RootElement)))
    {
        // Example 1 (SNS message) flow:
        // 1. SNSMessageParser.CanParse = true (finds "Type": "Notification")
        // 2. parser.Parse extracts inner message and SNS metadata
        // 3. messageBody = contents of "Message" field
        // 4. metadata contains SNS information (TopicArn, MessageId, etc.)

        // Example 2 (Raw SQS) flow:
        // 1. SNSMessageParser.CanParse = false (no SNS properties)
        // 2. EventBridgeMessageParser.CanParse = false (no EventBridge properties)
        // 3. SQSMessageParser.CanParse = true (fallback)
        // 4. messageBody = original message
        // 5. metadata contains just SQS information
        var (messageBody, metadata) = parser.Parse(document.RootElement, sqsMessage);

        // Update the message body if this parser extracted an inner message
        if (!string.IsNullOrEmpty(messageBody))
        {
            // For Example 1:
            // - Updates currentMessageBody to inner message
            // - Creates new JsonElement for next parser to check

            // For Example 2:
            // - This block runs but messageBody is same as original
            currentMessageBody = messageBody;

            document.Dispose(); // Dispose current JsonDocument before reassiginng to parsed message body.
            document = JsonDocument.Parse(messageBody);
        }

        // Combine metadata
        if (metadata.SQSMetadata != null) combinedMetadata.SQSMetadata = metadata.SQSMetadata;
        if (metadata.SNSMetadata != null) combinedMetadata.SNSMetadata = metadata.SNSMetadata;
        if (metadata.EventBridgeMetadata != null) combinedMetadata.EventBridgeMetadata = metadata.EventBridgeMetadata;
    }

    // Example 1 final return:
    // MessageBody = {
    //     "id": "order-123",
    //     "source": "com.myapp.orders",
    //     "type": "OrderCreated",
    //     "time": "2024-03-21T10:00:00Z",
    //     "data": { ... }
    // }
    // Metadata = {
    //     SNSMetadata: { TopicArn: "arn:aws...", MessageId: "abc-123" }
    // }

    // Example 2 final return:
    // MessageBody = {
    //     "id": "order-123",
    //     "source": "com.myapp.orders",
    //     "type": "OrderCreated",
    //     "time": "2024-03-21T10:00:00Z",
    //     "data": { ... }
    // }
    // Metadata = { } // Just basic SQS metadata

    return (currentMessageBody, combinedMetadata);
}
finally
{
    document.Dispose();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@GarrettBeatty GarrettBeatty requested a review from normj March 21, 2025 15:41
@GarrettBeatty GarrettBeatty changed the base branch from v4sdk-development to gcbeatty/merge March 25, 2025 21:11
"Name": "AWS.Messaging",
"Type": "Minor",
"ChangelogMessages": [
"Refactor logic for serialization by splitting messaging parsing into multiple classes."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this PR needs a change file since on it's own it doesn't mean much for our customers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thats true since there isnt any actual new logic ill remove this change file

Base automatically changed from gcbeatty/merge to v4sdk-development March 26, 2025 13:52
AWSAccount = GetJsonPropertyAsString(root, "account"),
AWSRegion = GetJsonPropertyAsString(root, "region"),
Resources = GetJsonPropertyAsList<string>(root, "resources")
rootCopy = document.RootElement.Clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages could be quite large so cloning the whole message is something we really shouldn't do. I understand you need to dispose the json document but you don't need to dispose of till the end of the method. What about something like this:

var document = JsonDocument.Parse(sqsMessage.Body);
try
{
    string currentMessageBody = sqsMessage.Body;
    var combinedMetadata = new MessageMetadata();

    // Try each parser in order
    foreach (var parser in _parsers.Where(p => p.CanParse(document.RootElement)))
    {
        // Example 1 (SNS message) flow:
        // 1. SNSMessageParser.CanParse = true (finds "Type": "Notification")
        // 2. parser.Parse extracts inner message and SNS metadata
        // 3. messageBody = contents of "Message" field
        // 4. metadata contains SNS information (TopicArn, MessageId, etc.)

        // Example 2 (Raw SQS) flow:
        // 1. SNSMessageParser.CanParse = false (no SNS properties)
        // 2. EventBridgeMessageParser.CanParse = false (no EventBridge properties)
        // 3. SQSMessageParser.CanParse = true (fallback)
        // 4. messageBody = original message
        // 5. metadata contains just SQS information
        var (messageBody, metadata) = parser.Parse(document.RootElement, sqsMessage);

        // Update the message body if this parser extracted an inner message
        if (!string.IsNullOrEmpty(messageBody))
        {
            // For Example 1:
            // - Updates currentMessageBody to inner message
            // - Creates new JsonElement for next parser to check

            // For Example 2:
            // - This block runs but messageBody is same as original
            currentMessageBody = messageBody;

            document.Dispose(); // Dispose current JsonDocument before reassiginng to parsed message body.
            document = JsonDocument.Parse(messageBody);
        }

        // Combine metadata
        if (metadata.SQSMetadata != null) combinedMetadata.SQSMetadata = metadata.SQSMetadata;
        if (metadata.SNSMetadata != null) combinedMetadata.SNSMetadata = metadata.SNSMetadata;
        if (metadata.EventBridgeMetadata != null) combinedMetadata.EventBridgeMetadata = metadata.EventBridgeMetadata;
    }

    // Example 1 final return:
    // MessageBody = {
    //     "id": "order-123",
    //     "source": "com.myapp.orders",
    //     "type": "OrderCreated",
    //     "time": "2024-03-21T10:00:00Z",
    //     "data": { ... }
    // }
    // Metadata = {
    //     SNSMetadata: { TopicArn: "arn:aws...", MessageId: "abc-123" }
    // }

    // Example 2 final return:
    // MessageBody = {
    //     "id": "order-123",
    //     "source": "com.myapp.orders",
    //     "type": "OrderCreated",
    //     "time": "2024-03-21T10:00:00Z",
    //     "data": { ... }
    // }
    // Metadata = { } // Just basic SQS metadata

    return (currentMessageBody, combinedMetadata);
}
finally
{
    document.Dispose();
}

@GarrettBeatty GarrettBeatty requested a review from normj March 26, 2025 19:36
@GarrettBeatty GarrettBeatty added the Release Not Needed Add this label if a PR does not need to be released. label Mar 26, 2025
@GarrettBeatty GarrettBeatty merged commit e735bbf into v4sdk-development Mar 26, 2025
5 of 6 checks passed
@GarrettBeatty GarrettBeatty deleted the refactorrebase branch March 26, 2025 22:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Release Not Needed Add this label if a PR does not need to be released.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants