@@ -29,6 +29,15 @@ internal class EnvelopeSerializer : IEnvelopeSerializer
29
29
private readonly IMessageSourceHandler _messageSourceHandler ;
30
30
private readonly ILogger < EnvelopeSerializer > _logger ;
31
31
32
+ // Order matters for the SQS parser (must be last), but SNS and EventBridge parsers
33
+ // can be in any order since they check for different, mutually exclusive properties
34
+ private static readonly IMessageParser [ ] _parsers = new IMessageParser [ ]
35
+ {
36
+ new SNSMessageParser ( ) , // Checks for SNS-specific properties (Type, TopicArn)
37
+ new EventBridgeMessageParser ( ) , // Checks for EventBridge properties (detail-type, detail)
38
+ new SQSMessageParser ( ) // Fallback parser - must be last
39
+ } ;
40
+
32
41
public EnvelopeSerializer (
33
42
ILogger < EnvelopeSerializer > logger ,
34
43
IMessageConfiguration messageConfiguration ,
@@ -142,12 +151,8 @@ public async ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message s
142
151
// Get the raw envelope JSON and metadata from the appropriate wrapper (SNS/EventBridge/SQS)
143
152
var ( envelopeJson , metadata ) = await ParseOuterWrapper ( sqsMessage ) ;
144
153
145
- // Parse just the type field first to get the correct mapping
146
- var messageType = GetMessageTypeFromEnvelope ( envelopeJson ) ;
147
- var subscriberMapping = GetAndValidateSubscriberMapping ( messageType ) ;
148
-
149
154
// Create and populate the envelope with the correct type
150
- var envelope = DeserializeEnvelope ( envelopeJson , subscriberMapping . MessageType , subscriberMapping ) ;
155
+ var ( envelope , subscriberMapping ) = DeserializeEnvelope ( envelopeJson ) ;
151
156
152
157
// Add metadata from outer wrapper
153
158
envelope . SQSMetadata = metadata . SQSMetadata ;
@@ -169,10 +174,15 @@ public async ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message s
169
174
}
170
175
}
171
176
172
- private MessageEnvelope DeserializeEnvelope ( string envelopeString , Type messageType , SubscriberMapping subscriberMapping )
177
+ private ( MessageEnvelope Envelope , SubscriberMapping Mapping ) DeserializeEnvelope ( string envelopeString )
173
178
{
174
179
using var document = JsonDocument . Parse ( envelopeString ) ;
175
180
var root = document . RootElement ;
181
+
182
+ // Get the message type and lookup mapping first
183
+ var messageType = root . GetProperty ( "type" ) . GetString ( ) ?? throw new InvalidDataException ( "Message type identifier not found in envelope" ) ;
184
+ var subscriberMapping = GetAndValidateSubscriberMapping ( messageType ) ;
185
+
176
186
var envelope = subscriberMapping . MessageEnvelopeFactory . Invoke ( ) ;
177
187
178
188
try
@@ -206,10 +216,10 @@ private MessageEnvelope DeserializeEnvelope(string envelopeString, Type messageT
206
216
207
217
// Deserialize the message content using the custom serializer
208
218
var dataContent = JsonPropertyHelper . GetRequiredProperty ( root , "data" , element => element . GetString ( ) ! ) ;
209
- var message = _messageSerializer . Deserialize ( dataContent , messageType ) ;
219
+ var message = _messageSerializer . Deserialize ( dataContent , subscriberMapping . MessageType ) ;
210
220
envelope . SetMessage ( message ) ;
211
221
212
- return envelope ;
222
+ return ( envelope , subscriberMapping ) ;
213
223
}
214
224
catch ( Exception ex )
215
225
{
@@ -218,43 +228,79 @@ private MessageEnvelope DeserializeEnvelope(string envelopeString, Type messageT
218
228
}
219
229
}
220
230
221
- private static string GetMessageTypeFromEnvelope ( string json )
222
- {
223
- using var doc = JsonDocument . Parse ( json ) ;
224
- return doc . RootElement . GetProperty ( "type" ) . GetString ( )
225
- ?? throw new InvalidDataException ( "Message type identifier not found in envelope" ) ;
226
- }
227
-
228
231
private async Task < ( string MessageBody , MessageMetadata Metadata ) > ParseOuterWrapper ( Message sqsMessage )
229
232
{
230
233
sqsMessage . Body = await InvokePreDeserializationCallback ( sqsMessage . Body ) ;
231
234
235
+ // Example 1: SNS-wrapped message in SQS
236
+ /*
237
+ sqsMessage.Body = {
238
+ "Type": "Notification",
239
+ "MessageId": "abc-123",
240
+ "TopicArn": "arn:aws:sns:us-east-1:123456789012:MyTopic",
241
+ "Message": {
242
+ "id": "order-123",
243
+ "source": "com.myapp.orders",
244
+ "type": "OrderCreated",
245
+ "time": "2024-03-21T10:00:00Z",
246
+ "data": {
247
+ "orderId": "12345",
248
+ "amount": 99.99
249
+ }
250
+ }
251
+ }
252
+ */
253
+
254
+ // Example 2: Raw SQS message
255
+ /*
256
+ sqsMessage.Body = {
257
+ "id": "order-123",
258
+ "source": "com.myapp.orders",
259
+ "type": "OrderCreated",
260
+ "time": "2024-03-21T10:00:00Z",
261
+ "data": {
262
+ "orderId": "12345",
263
+ "amount": 99.99
264
+ }
265
+ }
266
+ */
267
+
232
268
JsonElement rootCopy ;
233
269
using ( var document = JsonDocument . Parse ( sqsMessage . Body ) )
234
270
{
235
271
rootCopy = document . RootElement . Clone ( ) ;
236
272
}
237
273
238
- var parsers = new IMessageParser [ ]
239
- {
240
- new SNSMessageParser ( ) ,
241
- new EventBridgeMessageParser ( ) ,
242
- new SQSMessageParser ( )
243
- } ;
244
-
245
274
string currentMessageBody = sqsMessage . Body ;
246
275
var combinedMetadata = new MessageMetadata ( ) ;
247
276
248
- // Try all parsers in order
249
- foreach ( var parser in parsers . Where ( p => p . CanParse ( rootCopy ) ) )
277
+ // Try each parser in order
278
+ foreach ( var parser in _parsers . Where ( p => p . CanParse ( rootCopy ) ) )
250
279
{
280
+ // Example 1 (SNS message) flow:
281
+ // 1. SNSMessageParser.CanParse = true (finds "Type": "Notification")
282
+ // 2. parser.Parse extracts inner message and SNS metadata
283
+ // 3. messageBody = contents of "Message" field
284
+ // 4. metadata contains SNS information (TopicArn, MessageId, etc.)
285
+
286
+ // Example 2 (Raw SQS) flow:
287
+ // 1. SNSMessageParser.CanParse = false (no SNS properties)
288
+ // 2. EventBridgeMessageParser.CanParse = false (no EventBridge properties)
289
+ // 3. SQSMessageParser.CanParse = true (fallback)
290
+ // 4. messageBody = original message
291
+ // 5. metadata contains just SQS information
251
292
var ( messageBody , metadata ) = parser . Parse ( rootCopy , sqsMessage ) ;
252
293
253
294
// Update the message body if this parser extracted an inner message
254
295
if ( ! string . IsNullOrEmpty ( messageBody ) )
255
296
{
297
+ // For Example 1:
298
+ // - Updates currentMessageBody to inner message
299
+ // - Creates new JsonElement for next parser to check
300
+
301
+ // For Example 2:
302
+ // - This block runs but messageBody is same as original
256
303
currentMessageBody = messageBody ;
257
- // Parse the new message body for the next iteration
258
304
using var newDoc = JsonDocument . Parse ( messageBody ) ;
259
305
rootCopy = newDoc . RootElement . Clone ( ) ;
260
306
}
@@ -265,6 +311,28 @@ private static string GetMessageTypeFromEnvelope(string json)
265
311
if ( metadata . EventBridgeMetadata != null ) combinedMetadata . EventBridgeMetadata = metadata . EventBridgeMetadata ;
266
312
}
267
313
314
+ // Example 1 final return:
315
+ // MessageBody = {
316
+ // "id": "order-123",
317
+ // "source": "com.myapp.orders",
318
+ // "type": "OrderCreated",
319
+ // "time": "2024-03-21T10:00:00Z",
320
+ // "data": { ... }
321
+ // }
322
+ // Metadata = {
323
+ // SNSMetadata: { TopicArn: "arn:aws...", MessageId: "abc-123" }
324
+ // }
325
+
326
+ // Example 2 final return:
327
+ // MessageBody = {
328
+ // "id": "order-123",
329
+ // "source": "com.myapp.orders",
330
+ // "type": "OrderCreated",
331
+ // "time": "2024-03-21T10:00:00Z",
332
+ // "data": { ... }
333
+ // }
334
+ // Metadata = { } // Just basic SQS metadata
335
+
268
336
return ( currentMessageBody , combinedMetadata ) ;
269
337
}
270
338
0 commit comments