@@ -102,18 +102,6 @@ async def _receive_messages(
102
102
logger .info (f"Received message from client { client_id } : { message } " )
103
103
oneofcase = message .WhichOneof ("message" )
104
104
match oneofcase :
105
- case "request" :
106
- request : agent_worker_pb2 .RpcRequest = message .request
107
- task = asyncio .create_task (self ._process_request (request , client_id ))
108
- self ._background_tasks .add (task )
109
- task .add_done_callback (self ._raise_on_exception )
110
- task .add_done_callback (self ._background_tasks .discard )
111
- case "response" :
112
- response : agent_worker_pb2 .RpcResponse = message .response
113
- task = asyncio .create_task (self ._process_response (response , client_id ))
114
- self ._background_tasks .add (task )
115
- task .add_done_callback (self ._raise_on_exception )
116
- task .add_done_callback (self ._background_tasks .discard )
117
105
case "cloudEvent" :
118
106
# The proto typing doesnt resolve this one
119
107
event = cast (cloudevent_pb2 .CloudEvent , message .cloudEvent ) # type: ignore
@@ -140,43 +128,6 @@ async def _receive_messages(
140
128
case None :
141
129
logger .warning ("Received empty message" )
142
130
143
- async def _process_request (self , request : agent_worker_pb2 .RpcRequest , client_id : int ) -> None :
144
- # Deliver the message to a client given the target agent type.
145
- async with self ._agent_type_to_client_id_lock :
146
- target_client_id = self ._agent_type_to_client_id .get (request .target .type )
147
- if target_client_id is None :
148
- logger .error (f"Agent { request .target .type } not found, failed to deliver message." )
149
- return
150
- target_send_queue = self ._send_queues .get (target_client_id )
151
- if target_send_queue is None :
152
- logger .error (f"Client { target_client_id } not found, failed to deliver message." )
153
- return
154
- await target_send_queue .put (agent_worker_pb2 .Message (request = request ))
155
-
156
- # Create a future to wait for the response from the target.
157
- future = asyncio .get_event_loop ().create_future ()
158
- self ._pending_responses .setdefault (target_client_id , {})[request .request_id ] = future
159
-
160
- # Create a task to wait for the response and send it back to the client.
161
- send_response_task = asyncio .create_task (self ._wait_and_send_response (future , client_id ))
162
- self ._background_tasks .add (send_response_task )
163
- send_response_task .add_done_callback (self ._raise_on_exception )
164
- send_response_task .add_done_callback (self ._background_tasks .discard )
165
-
166
- async def _wait_and_send_response (self , future : Future [agent_worker_pb2 .RpcResponse ], client_id : int ) -> None :
167
- response = await future
168
- message = agent_worker_pb2 .Message (response = response )
169
- send_queue = self ._send_queues .get (client_id )
170
- if send_queue is None :
171
- logger .error (f"Client { client_id } not found, failed to send response message." )
172
- return
173
- await send_queue .put (message )
174
-
175
- async def _process_response (self , response : agent_worker_pb2 .RpcResponse , client_id : int ) -> None :
176
- # Setting the result of the future will send the response back to the original sender.
177
- future = self ._pending_responses [client_id ].pop (response .request_id )
178
- future .set_result (response )
179
-
180
131
async def _process_event (self , event : cloudevent_pb2 .CloudEvent ) -> None :
181
132
topic_id = TopicId (type = event .type , source = event .source )
182
133
recipients = await self ._subscription_manager .get_subscribed_recipients (topic_id )
0 commit comments