Skip to content

Commit 91901e5

Browse files
Merge pull request #7 from mortenlaursen/main
Allow custom JSON serialization options by @mortenlaursen
2 parents 34881bd + ebf80b2 commit 91901e5

14 files changed

+784
-541
lines changed

README.md

+23
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,29 @@ builder.Services.AddSwaggerGen();
7777
builder.Services.ConfigureMessagingServices(builder.Configuration, true);
7878
```
7979

80+
## Custom JsonSerializerOptions
81+
82+
You can customize JSON serialization in your application by injecting `JsonSerializerOptions` into the `ConfigureMessagingServices` method.
83+
84+
### How to Use
85+
Pass your `JsonSerializerOptions` when setting up messaging services. If not provided, default settings are used.
86+
87+
```csharp
88+
var builder = WebApplication.CreateBuilder(args);
89+
90+
var jsonOptions = new JsonSerializerOptions
91+
{
92+
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
93+
WriteIndented = true,
94+
};
95+
96+
builder.Services.ConfigureMessagingServices(
97+
builder.Configuration,
98+
useAzureCredentials: true,
99+
jsonSerializerOptions: jsonOptions
100+
);
101+
```
102+
80103
## Publishing to EventHub
81104

82105
To publish events to an EventHub you need an instance of `IEventHubPublisher`, this can be constructed via the `IEventHubPublisherFactory` which exposes the `Create(string eventHubName)` method
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Atc.Azure.Messaging.Serialization;
2+
13
namespace Atc.Azure.Messaging.EventHub;
24

35
[SuppressMessage(
@@ -8,24 +10,28 @@ internal sealed class EventHubCredentialsPublisherFactory : IEventHubPublisherFa
810
{
911
private readonly string fullyQualifiedNamespace;
1012
private readonly DefaultAzureCredentialOptions credentialOptions;
13+
private readonly IMessagePayloadSerializer messagePayloadSerializer;
1114

1215
public EventHubCredentialsPublisherFactory(
1316
EventHubOptions options,
1417
EnvironmentOptions environmentOptions,
1518
ClientAuthorizationOptions clientCredentialOptions,
16-
IAzureCredentialOptionsProvider credentialOptionsProvider)
19+
IAzureCredentialOptionsProvider credentialOptionsProvider,
20+
IMessagePayloadSerializer messagePayloadSerializer)
1721
{
18-
this.fullyQualifiedNamespace = options.FullyQualifiedNamespace;
19-
this.credentialOptions = credentialOptionsProvider
20-
.GetAzureCredentialOptions(
21-
environmentOptions,
22-
clientCredentialOptions);
22+
this.messagePayloadSerializer = messagePayloadSerializer;
23+
this.fullyQualifiedNamespace = options.FullyQualifiedNamespace;
24+
this.credentialOptions = credentialOptionsProvider
25+
.GetAzureCredentialOptions(
26+
environmentOptions,
27+
clientCredentialOptions);
2328
}
24-
25-
public IEventHubPublisher Create(string eventHubName)
29+
30+
public IEventHubPublisher Create(string eventHubName)
2631
=> new EventHubPublisher(
27-
new EventHubProducerClient(
28-
fullyQualifiedNamespace,
29-
eventHubName,
30-
new DefaultAzureCredential(credentialOptions)));
32+
new EventHubProducerClient(
33+
fullyQualifiedNamespace,
34+
eventHubName,
35+
new DefaultAzureCredential(credentialOptions)),
36+
messagePayloadSerializer);
3137
}

src/Atc.Azure.Messaging/EventHub/EventHubPublisher.cs

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
1+
using Atc.Azure.Messaging.Serialization;
2+
13
namespace Atc.Azure.Messaging.EventHub;
24

35
internal sealed class EventHubPublisher : IEventHubPublisher
46
{
57
private readonly EventHubProducerClient client;
8+
private readonly IMessagePayloadSerializer messagePayloadSerializer;
69

7-
public EventHubPublisher(EventHubProducerClient client)
10+
public EventHubPublisher(
11+
EventHubProducerClient client,
12+
IMessagePayloadSerializer messagePayloadSerializer)
813
{
914
this.client = client;
15+
this.messagePayloadSerializer = messagePayloadSerializer;
1016
}
1117

1218
public Task PublishAsync(
@@ -15,7 +21,18 @@ public Task PublishAsync(
1521
CancellationToken cancellationToken = default)
1622
{
1723
return PerformPublishAsync(
18-
JsonSerializer.Serialize(message),
24+
messagePayloadSerializer.Serialize(message),
25+
messageProperties,
26+
cancellationToken);
27+
}
28+
29+
public Task PublishAsync(
30+
string message,
31+
IDictionary<string, string>? messageProperties = null,
32+
CancellationToken cancellationToken = default)
33+
{
34+
return PerformPublishAsync(
35+
message,
1936
messageProperties,
2037
cancellationToken);
2138
}
@@ -39,10 +56,7 @@ private Task PerformPublishAsync(
3956
}
4057

4158
return client.SendAsync(
42-
new[]
43-
{
44-
eventData,
45-
},
59+
new[] { eventData, },
4660
cancellationToken);
4761
}
4862

Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Atc.Azure.Messaging.Serialization;
2+
13
namespace Atc.Azure.Messaging.EventHub;
24

35
[SuppressMessage(
@@ -7,15 +9,20 @@ namespace Atc.Azure.Messaging.EventHub;
79
internal sealed class EventHubPublisherFactory : IEventHubPublisherFactory
810
{
911
private readonly string connectionString;
12+
private readonly IMessagePayloadSerializer messagePayloadSerializer;
1013

11-
public EventHubPublisherFactory(EventHubOptions options)
14+
public EventHubPublisherFactory(
15+
EventHubOptions options,
16+
IMessagePayloadSerializer messagePayloadSerializer)
1217
{
13-
this.connectionString = options.ConnectionString;
18+
this.messagePayloadSerializer = messagePayloadSerializer;
19+
connectionString = options.ConnectionString;
1420
}
1521

1622
public IEventHubPublisher Create(string eventHubName)
1723
=> new EventHubPublisher(
1824
new EventHubProducerClient(
1925
connectionString,
20-
eventHubName));
26+
eventHubName),
27+
messagePayloadSerializer);
2128
}

src/Atc.Azure.Messaging/EventHub/IEventHubPublisher.cs

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
namespace Atc.Azure.Messaging.EventHub;
22

3-
/// <summary>
4-
/// Publisher responsible for publishing objects with metadata to a specific EventHub.
3+
/// <summary>
4+
/// Publisher responsible for publishing objects with metadata to a specific EventHub.
55
/// </summary>
66
/// <remarks>
77
/// Is safe to cache and use in singletons for the lifetime of the application.
@@ -12,4 +12,9 @@ Task PublishAsync(
1212
object message,
1313
IDictionary<string, string>? messageProperties = null,
1414
CancellationToken cancellationToken = default);
15+
16+
Task PublishAsync(
17+
string message,
18+
IDictionary<string, string>? messageProperties = null,
19+
CancellationToken cancellationToken = default);
1520
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace Atc.Azure.Messaging.Serialization;
2+
3+
public interface IMessagePayloadSerializer
4+
{
5+
string Serialize<T>(T value);
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
namespace Atc.Azure.Messaging.Serialization;
2+
3+
public class MessagePayloadSerializer : IMessagePayloadSerializer
4+
{
5+
private readonly JsonSerializerOptions options;
6+
7+
public MessagePayloadSerializer(JsonSerializerOptions options)
8+
{
9+
this.options = options;
10+
}
11+
12+
public string Serialize<T>(T value)
13+
=> JsonSerializer.Serialize(value, options);
14+
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
namespace Atc.Azure.Messaging.ServiceBus;
2-
1+
namespace Atc.Azure.Messaging.ServiceBus;
2+
33
/// <summary>
44
/// Publisher responsible for publishing objects with metadata to a specific ServiceBus.
5-
/// </summary>
6-
public interface IServiceBusPublisher
7-
{
5+
/// </summary>
6+
public interface IServiceBusPublisher
7+
{
88
/// <summary>
99
/// Publishes a message.
1010
/// </summary>
@@ -13,14 +13,32 @@ public interface IServiceBusPublisher
1313
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
1414
/// <param name="properties">Optional custom metadata about the message.</param>
1515
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
16-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
17-
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
18-
Task PublishAsync(
19-
string topicOrQueue,
20-
object message,
21-
string? sessionId = null,
22-
IDictionary<string, string>? properties = null,
23-
TimeSpan? timeToLive = null,
16+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
17+
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
18+
Task PublishAsync(
19+
string topicOrQueue,
20+
object message,
21+
string? sessionId = null,
22+
IDictionary<string, string>? properties = null,
23+
TimeSpan? timeToLive = null,
24+
CancellationToken cancellationToken = default);
25+
26+
/// <summary>
27+
/// Publishes a message.
28+
/// </summary>
29+
/// <param name="topicOrQueue">The topic or queue name.</param>
30+
/// <param name="message">The message to be published, in a serialized format.</param>
31+
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
32+
/// <param name="properties">Optional custom metadata about the message.</param>
33+
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
34+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
35+
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
36+
Task PublishAsync(
37+
string topicOrQueue,
38+
string message,
39+
string? sessionId = null,
40+
IDictionary<string, string>? properties = null,
41+
TimeSpan? timeToLive = null,
2442
CancellationToken cancellationToken = default);
2543

2644
/// <summary>
@@ -31,14 +49,14 @@ Task PublishAsync(
3149
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
3250
/// <param name="properties">Optional custom metadata about the message.</param>
3351
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
34-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
35-
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
36-
Task PublishAsync(
37-
string topicOrQueue,
38-
IReadOnlyCollection<object> messages,
39-
string? sessionId = null,
40-
IDictionary<string, string>? properties = null,
41-
TimeSpan? timeToLive = null,
52+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
53+
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
54+
Task PublishAsync(
55+
string topicOrQueue,
56+
IReadOnlyCollection<object> messages,
57+
string? sessionId = null,
58+
IDictionary<string, string>? properties = null,
59+
TimeSpan? timeToLive = null,
4260
CancellationToken cancellationToken = default);
4361

4462
/// <summary>
@@ -50,26 +68,26 @@ Task PublishAsync(
5068
/// <param name="sessionId">Optional id for appending the message to a known session. If not set, then defaults to a new session.</param>
5169
/// <param name="properties">Optional custom metadata about the message.</param>
5270
/// <param name="timeToLive">Optional <see cref="TimeSpan"/> for message to be consumed. If not set, then defaults to the value specified on queue or topic.</param>
53-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
54-
/// <returns>A <see cref="Task"/> containing the sequence number of the scheduled message.</returns>
55-
Task<long> SchedulePublishAsync(
56-
string topicOrQueue,
57-
object message,
58-
DateTimeOffset scheduledEnqueueTime,
59-
string? sessionId = null,
60-
IDictionary<string, string>? properties = null,
61-
TimeSpan? timeToLive = null,
71+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
72+
/// <returns>A <see cref="Task"/> containing the sequence number of the scheduled message.</returns>
73+
Task<long> SchedulePublishAsync(
74+
string topicOrQueue,
75+
object message,
76+
DateTimeOffset scheduledEnqueueTime,
77+
string? sessionId = null,
78+
IDictionary<string, string>? properties = null,
79+
TimeSpan? timeToLive = null,
6280
CancellationToken cancellationToken = default);
6381

6482
/// <summary>
6583
/// Cancels a scheduled publish of a message if it has not been published yet.
6684
/// </summary>
6785
/// <param name="topicOrQueue">The topic or queue name.</param>
6886
/// <param name="sequenceNumber">The sequence number of the scheduled message to cancel.</param>
69-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
70-
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
71-
Task CancelScheduledPublishAsync(
72-
string topicOrQueue,
73-
long sequenceNumber,
74-
CancellationToken cancellationToken = default);
87+
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used.</param>
88+
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
89+
Task CancelScheduledPublishAsync(
90+
string topicOrQueue,
91+
long sequenceNumber,
92+
CancellationToken cancellationToken = default);
7593
}

0 commit comments

Comments
 (0)