Skip to content

Commit

Permalink
Updates to proto for state apis (#5407)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgerrits authored Feb 6, 2025
1 parent da6f918 commit 25f26a3
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ public override async Task OpenChannel(IAsyncStreamReader<Message> requestStream
throw;
}
}
public override async Task<GetStateResponse> GetState(AgentId request, ServerCallContext context) => new GetStateResponse { AgentState = new AgentState { AgentId = request } };
public override async Task<SaveStateResponse> SaveState(AgentState request, ServerCallContext context) => new SaveStateResponse { };
public override async Task<AddSubscriptionResponse> AddSubscription(AddSubscriptionRequest request, ServerCallContext context) => new AddSubscriptionResponse { };
public override async Task<RemoveSubscriptionResponse> RemoveSubscription(RemoveSubscriptionRequest request, ServerCallContext context) => new RemoveSubscriptionResponse { };
public override async Task<GetSubscriptionsResponse> GetSubscriptions(GetSubscriptionsRequest request, ServerCallContext context) => new GetSubscriptionsResponse { };
Expand Down
57 changes: 38 additions & 19 deletions protos/agent_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,6 @@ message GetSubscriptionsResponse {
repeated Subscription subscriptions = 1;
}

message AgentState {
AgentId agent_id = 1;
string eTag = 2;
oneof data {
bytes binary_data = 3;
string text_data = 4;
google.protobuf.Any proto_data = 5;
}
}

message GetStateResponse {
AgentState agent_state = 1;
}

message SaveStateResponse {
}

message Message {
oneof message {
RpcRequest request = 1;
Expand All @@ -104,10 +87,46 @@ message Message {
}
}

message SaveStateRequest {
AgentId agentId = 1;
}

message SaveStateResponse {
string state = 1;
optional string error = 2;
}

message LoadStateRequest {
AgentId agentId = 1;
string state = 2;
}
message LoadStateResponse {
optional string error = 1;
}

message ControlMessage {
// A response message should have the same id as the request message
string rpc_id = 1;
// This is either:
// agentid=AGENT_ID
// clientid=CLIENT_ID
string destination = 2;
// This is either:
// agentid=AGENT_ID
// clientid=CLIENT_ID
// Empty string means the message is a response
optional string respond_to = 3;
// One of:
// SaveStateRequest saveStateRequest = 2;
// SaveStateResponse saveStateResponse = 3;
// LoadStateRequest loadStateRequest = 4;
// LoadStateResponse loadStateResponse = 5;
google.protobuf.Any rpcMessage = 4;
}

service AgentRpc {
rpc OpenChannel (stream Message) returns (stream Message);
rpc GetState(AgentId) returns (GetStateResponse);
rpc SaveState(AgentState) returns (SaveStateResponse);
rpc OpenControlChannel (stream ControlMessage) returns (stream ControlMessage);
rpc RegisterAgent(RegisterAgentTypeRequest) returns (RegisterAgentTypeResponse);
rpc AddSubscription(AddSubscriptionRequest) returns (AddSubscriptionResponse);
rpc RemoveSubscription(RemoveSubscriptionRequest) returns (RemoveSubscriptionResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ async def handle_callback(message: agent_worker_pb2.Message) -> None:
# Remove the client id from the agent type to client id mapping.
await self._on_client_disconnect(client_id)

async def OpenControlChannel( # type: ignore
self,
request_iterator: AsyncIterator[agent_worker_pb2.ControlMessage],
context: grpc.aio.ServicerContext[agent_worker_pb2.ControlMessage, agent_worker_pb2.ControlMessage],
) -> AsyncIterator[agent_worker_pb2.ControlMessage]:
raise NotImplementedError("Method not implemented.")

async def _on_client_disconnect(self, client_id: ClientConnectionId) -> None:
async with self._agent_type_to_client_id_lock:
agent_types = [agent_type for agent_type, id_ in self._agent_type_to_client_id.items() if id_ == client_id]
Expand Down Expand Up @@ -288,17 +295,3 @@ async def GetSubscriptions( # type: ignore
) -> agent_worker_pb2.GetSubscriptionsResponse:
_client_id = await get_client_id_or_abort(context)
raise NotImplementedError("Method not implemented.")

async def GetState( # type: ignore
self,
request: agent_worker_pb2.AgentId,
context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.GetStateResponse],
) -> agent_worker_pb2.GetStateResponse:
raise NotImplementedError("Method not implemented!")

async def SaveState( # type: ignore
self,
request: agent_worker_pb2.AgentState,
context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.SaveStateResponse],
) -> agent_worker_pb2.SaveStateResponse:
raise NotImplementedError("Method not implemented!")

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 25f26a3

Please sign in to comment.