-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathMessageDispatcher.cs
106 lines (88 loc) · 3.49 KB
/
MessageDispatcher.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
using JustSaying.Messaging.Channels.Context;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware;
using JustSaying.Messaging.Monitoring;
using JustSaying.Models;
using Microsoft.Extensions.Logging;
namespace JustSaying.AwsTools.MessageHandling.Dispatch;
internal class MessageDispatcher : IMessageDispatcher
{
private readonly IMessageMonitor _messagingMonitor;
private readonly MiddlewareMap _middlewareMap;
private static ILogger _logger;
public MessageDispatcher(
IMessageMonitor messagingMonitor,
MiddlewareMap middlewareMap,
ILoggerFactory loggerFactory)
{
_messagingMonitor = messagingMonitor;
_middlewareMap = middlewareMap;
_logger = loggerFactory.CreateLogger("JustSaying");
}
public async Task DispatchMessageAsync(
IQueueMessageContext messageContext,
CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
(bool success, Message typedMessage, MessageAttributes attributes) =
await DeserializeMessage(messageContext, cancellationToken).ConfigureAwait(false);
if (!success)
{
return;
}
var messageType = typedMessage.GetType();
var middleware = _middlewareMap.Get(messageContext.QueueName, messageType);
if (middleware == null)
{
_logger.LogError(
"Failed to dispatch. Middleware for message of type '{MessageTypeName}' not found in middleware map.",
typedMessage.GetType().FullName);
return;
}
var handleContext = new HandleMessageContext(
messageContext.QueueName,
messageContext.Message,
typedMessage,
messageType,
messageContext,
messageContext,
messageContext.QueueUri,
attributes);
await middleware.RunAsync(handleContext, null, cancellationToken)
.ConfigureAwait(false);
}
private async Task<(bool success, Message typedMessage, MessageAttributes attributes)>
DeserializeMessage(IQueueMessageContext messageContext, CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Attempting to deserialize message.");
var (message, attributes) = messageContext.MessageConverter.ConvertForReceive(messageContext.Message);
return (true, message, attributes);
}
catch (MessageFormatNotSupportedException ex)
{
_logger.LogWarning(ex,
"Could not handle message with Id '{MessageId}' because a deserializer for the content is not configured. Message body: '{MessageBody}'.",
messageContext.Message.MessageId,
messageContext.Message.Body);
await messageContext.DeleteMessage(cancellationToken).ConfigureAwait(false);
_messagingMonitor.HandleError(ex, messageContext.Message);
return (false, null, null);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Error deserializing message with Id '{MessageId}' and body '{MessageBody}'.",
messageContext.Message.MessageId,
messageContext.Message.Body);
_messagingMonitor.HandleError(ex, messageContext.Message);
return (false, null, null);
}
}
}