@@ -20,6 +20,8 @@ public sealed class GrpcGateway : BackgroundService, IGateway
20
20
// The agents supported by each worker process.
21
21
private readonly ConcurrentDictionary < string , List < GrpcWorkerConnection > > _supportedAgentTypes = [ ] ;
22
22
public readonly ConcurrentDictionary < IConnection , IConnection > _workers = new ( ) ;
23
+ private readonly ConcurrentDictionary < string , Subscription > _subscriptionsByAgentType = new ( ) ;
24
+ private readonly ConcurrentDictionary < string , List < string > > _subscriptionsByTopic = new ( ) ;
23
25
24
26
// The mapping from agent id to worker process.
25
27
private readonly ConcurrentDictionary < ( string Type , string Key ) , GrpcWorkerConnection > _agentDirectory = new ( ) ;
@@ -102,10 +104,22 @@ internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Mess
102
104
case Message . MessageOneofCase . RegisterAgentTypeRequest :
103
105
await RegisterAgentTypeAsync ( connection , message . RegisterAgentTypeRequest ) ;
104
106
break ;
107
+ case Message . MessageOneofCase . AddSubscriptionRequest :
108
+ await AddSubscriptionAsync ( connection , message . AddSubscriptionRequest ) ;
109
+ break ;
105
110
default :
106
111
throw new InvalidOperationException ( $ "Unknown message type for message '{ message } '.") ;
107
112
} ;
108
113
}
114
+ private async ValueTask AddSubscriptionAsync ( GrpcWorkerConnection connection , AddSubscriptionRequest request )
115
+ {
116
+ var topic = request . Subscription . TypeSubscription . TopicType ;
117
+ var agentType = request . Subscription . TypeSubscription . AgentType ;
118
+ _subscriptionsByAgentType [ agentType ] = request . Subscription ;
119
+ _subscriptionsByTopic [ topic ] . Add ( agentType ) ;
120
+ var response = new AddSubscriptionResponse { RequestId = request . RequestId , Error = "" , Success = true } ;
121
+ await connection . ResponseStream . WriteAsync ( response ) . ConfigureAwait ( false ) ;
122
+ }
109
123
private async ValueTask RegisterAgentTypeAsync ( GrpcWorkerConnection connection , RegisterAgentTypeRequest msg )
110
124
{
111
125
connection . AddSupportedType ( msg . Type ) ;
0 commit comments