@@ -67,7 +67,8 @@ public async ValueTask<MessageEnvelope<T>> CreateEnvelopeAsync<T>(T message)
67
67
Version = CLOUD_EVENT_SPEC_VERSION ,
68
68
MessageTypeIdentifier = publisherMapping . MessageTypeIdentifier ,
69
69
TimeStamp = timeStamp ,
70
- Message = message
70
+ Message = message ,
71
+ // DataContentType = "" // TODO
71
72
} ;
72
73
}
73
74
@@ -91,10 +92,12 @@ public async ValueTask<string> SerializeAsync<T>(MessageEnvelope<T> envelope)
91
92
[ "source" ] = envelope . Source ? . ToString ( ) ,
92
93
[ "specversion" ] = envelope . Version ,
93
94
[ "type" ] = envelope . MessageTypeIdentifier ,
94
- [ "time" ] = envelope . TimeStamp ,
95
- [ "data" ] = _messageSerializer . Serialize ( message )
95
+ [ "time" ] = envelope . TimeStamp
96
96
} ;
97
97
98
+ // See https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/formats/json-format.md#31-handling-of-data for more details.
99
+ SerializeData ( message , blob , envelope . DataContentType ) ;
100
+
98
101
// Write any Metadata as top-level keys
99
102
// This may be useful for any extensions defined in
100
103
// https://github.com/cloudevents/spec/tree/main/cloudevents/extensions
@@ -107,17 +110,17 @@ public async ValueTask<string> SerializeAsync<T>(MessageEnvelope<T> envelope)
107
110
}
108
111
109
112
var jsonString = blob . ToJsonString ( ) ;
110
- var serializedMessage = await InvokePostSerializationCallback ( jsonString ) ;
113
+ var finalSerializedMessage = await InvokePostSerializationCallback ( jsonString ) ;
111
114
112
115
if ( _messageConfiguration . LogMessageContent )
113
116
{
114
- _logger . LogTrace ( "Serialized the MessageEnvelope object as the following raw string:\n {SerializedMessage}" , serializedMessage ) ;
117
+ _logger . LogTrace ( "Serialized the MessageEnvelope object as the following raw string:\n {SerializedMessage}" , finalSerializedMessage ) ;
115
118
}
116
119
else
117
120
{
118
121
_logger . LogTrace ( "Serialized the MessageEnvelope object to a raw string" ) ;
119
122
}
120
- return serializedMessage ;
123
+ return finalSerializedMessage ;
121
124
}
122
125
catch ( JsonException ) when ( ! _messageConfiguration . LogMessageContent )
123
126
{
@@ -131,6 +134,16 @@ public async ValueTask<string> SerializeAsync<T>(MessageEnvelope<T> envelope)
131
134
}
132
135
}
133
136
137
+ private string ExtractDataContent ( JsonElement dataElement , string ? dataContentType )
138
+ {
139
+ return IsJsonContentType ( dataContentType )
140
+ ? dataElement . ValueKind == JsonValueKind . String
141
+ ? dataElement . GetString ( ) !
142
+ : dataElement . GetRawText ( )
143
+ : dataElement . GetString ( )
144
+ ?? throw new InvalidDataException ( "Data must be a string for non-JSON content type" ) ;
145
+ }
146
+
134
147
/// <inheritdoc/>
135
148
public async ValueTask < ConvertToEnvelopeResult > ConvertToEnvelopeAsync ( Message sqsMessage )
136
149
{
@@ -143,6 +156,7 @@ public async ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message s
143
156
var subscriberMapping = GetAndValidateSubscriberMapping ( parsedResult . Envelope . MessageTypeIdentifier ) ;
144
157
var deserializedMessage = DeserializeDataContent (
145
158
parsedResult . Envelope . Message ! ,
159
+ parsedResult . Envelope . DataContentType ,
146
160
subscriberMapping . MessageType ) ;
147
161
148
162
// Create and populate final envelope
@@ -174,11 +188,35 @@ public async ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message s
174
188
private async Task < ParsedEnvelopeResult > ParseMessageEnvelope ( Message sqsMessage )
175
189
{
176
190
sqsMessage . Body = await InvokePreDeserializationCallback ( sqsMessage . Body ) ;
177
- var messageEnvelopeConfiguration = GetMessageEnvelopeConfiguration ( sqsMessage ) ;
178
- var intermediateEnvelope = JsonSerializer . Deserialize < MessageEnvelope < string > > ( messageEnvelopeConfiguration . MessageEnvelopeBody ! ) ! ;
191
+ var config = GetMessageEnvelopeConfiguration ( sqsMessage ) ;
192
+
193
+ using var document = JsonDocument . Parse ( config . MessageEnvelopeBody ! ) ;
194
+ var root = document . RootElement ;
195
+
196
+ if ( ! root . TryGetProperty ( "data" , out var dataElement ) )
197
+ {
198
+ throw new InvalidDataException ( "Message envelope is missing required 'data' field" ) ;
199
+ }
200
+
201
+ string ? dataContentType = root . TryGetProperty ( "datacontenttype" , out var contentTypeElement )
202
+ ? contentTypeElement . GetString ( )
203
+ : "application/json" ;
204
+
205
+ string dataContent = ExtractDataContent ( dataElement , dataContentType ) ;
206
+
207
+ // Create intermediate envelope with all properties
208
+ var envelopeJson = new JsonObject ( ) ;
209
+ foreach ( var property in root . EnumerateObject ( ) )
210
+ {
211
+ envelopeJson [ property . Name ] = property . Name == "data"
212
+ ? JsonValue . Create ( dataContent )
213
+ : JsonNode . Parse ( property . Value . GetRawText ( ) ) ;
214
+ }
215
+
216
+ var intermediateEnvelope = JsonSerializer . Deserialize < MessageEnvelope < string > > ( envelopeJson . ToJsonString ( ) ) ! ;
179
217
ValidateMessageEnvelope ( intermediateEnvelope ) ;
180
218
181
- return new ParsedEnvelopeResult ( intermediateEnvelope , messageEnvelopeConfiguration ) ;
219
+ return new ParsedEnvelopeResult ( intermediateEnvelope , config ) ;
182
220
}
183
221
184
222
private MessageEnvelope CreateFinalEnvelope (
@@ -204,6 +242,7 @@ private MessageEnvelope CreateFinalEnvelope(
204
242
finalEnvelope . MessageTypeIdentifier = intermediateEnvelope . MessageTypeIdentifier ;
205
243
finalEnvelope . TimeStamp = intermediateEnvelope . TimeStamp ;
206
244
finalEnvelope . Metadata = intermediateEnvelope . Metadata ;
245
+ finalEnvelope . DataContentType = intermediateEnvelope . DataContentType ;
207
246
finalEnvelope . SQSMetadata = config . SQSMetadata ;
208
247
finalEnvelope . SNSMetadata = config . SNSMetadata ;
209
248
finalEnvelope . EventBridgeMetadata = config . EventBridgeMetadata ;
@@ -232,9 +271,87 @@ private SubscriberMapping GetAndValidateSubscriberMapping(string messageTypeIden
232
271
return subscriberMapping ;
233
272
}
234
273
235
- private object DeserializeDataContent ( string dataContent , Type messageType )
274
+ private object DeserializeDataContent ( string dataContent , string ? dataContentType , Type messageType )
236
275
{
237
- return _messageSerializer . Deserialize ( dataContent , messageType ) ;
276
+ if ( IsJsonContentType ( dataContentType ) )
277
+ {
278
+ return _messageSerializer . Deserialize ( dataContent , messageType ) ;
279
+ }
280
+
281
+ if ( messageType == typeof ( string ) )
282
+ {
283
+ return dataContent ;
284
+ }
285
+
286
+ throw new InvalidOperationException (
287
+ $ "Cannot deserialize non-JSON content type { dataContentType } to type { messageType } ") ;
288
+ }
289
+
290
+ private void SerializeData < T > ( T message , JsonObject blob , string ? dataContentType )
291
+ {
292
+ if ( message == null )
293
+ {
294
+ throw new ArgumentNullException ( "The underlying application message cannot be null" ) ;
295
+ }
296
+
297
+ // Serialize the message
298
+ var serializedMessage = _messageSerializer . Serialize ( message ) ;
299
+
300
+ // Determine if the serialized message is valid JSON
301
+ // Wed do this because _messageSerializer is injected and there is no guarantee that it serializes to json.
302
+ bool isJson = IsValidJson ( serializedMessage ) ;
303
+ blob [ "datacontenttype" ] = dataContentType ;
304
+
305
+ if ( IsJsonContentType ( dataContentType ) )
306
+ {
307
+ if ( isJson )
308
+ {
309
+ // If it's valid JSON, parse it to maintain structure
310
+ blob [ "data" ] = JsonNode . Parse ( serializedMessage ) ;
311
+ }
312
+ else
313
+ {
314
+ // If it's not valid JSON but content type indicates JSON,
315
+ // log warning and store as string
316
+ _logger . LogWarning ( "Data was serialized as non-JSON, but datacontenttype indicates JSON format. Storing as string." ) ;
317
+ blob [ "data" ] = serializedMessage ;
318
+ }
319
+ }
320
+ else
321
+ {
322
+ // For non-JSON content types, store as string
323
+ blob [ "data" ] = serializedMessage ;
324
+ }
325
+ }
326
+
327
+ private bool IsJsonContentType ( string ? contentType )
328
+ {
329
+ if ( string . IsNullOrEmpty ( contentType ) )
330
+ {
331
+ // If datacontenttype is unspecified, treat as application/json
332
+ return true ;
333
+ }
334
+
335
+ // Strip any parameters (anything after ';')
336
+ var mediaType = contentType . Split ( ';' ) [ 0 ] . Trim ( ) . ToLowerInvariant ( ) ;
337
+
338
+ return mediaType . EndsWith ( "/json" ) || // Matches */json
339
+ ( mediaType . Contains ( '/' ) && mediaType . EndsWith ( "+json" ) ) ; // Matches */*+json
340
+ }
341
+
342
+ private bool IsValidJson ( string strInput )
343
+ {
344
+ if ( string . IsNullOrWhiteSpace ( strInput ) ) return false ;
345
+
346
+ try
347
+ {
348
+ JsonDocument . Parse ( strInput ) ;
349
+ return true ;
350
+ }
351
+ catch ( JsonException )
352
+ {
353
+ return false ;
354
+ }
238
355
}
239
356
240
357
private void ValidateMessageEnvelope < T > ( MessageEnvelope < T > ? messageEnvelope )
0 commit comments