Skip to content

Commit 6af57b6

Browse files
committedNov 27, 2024
Cloud event publishing
1 parent fe96f7d commit 6af57b6

File tree

5 files changed

+40
-29
lines changed

5 files changed

+40
-29
lines changed
 

‎protos/agent_worker.proto

+1-2
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,11 @@ message Message {
117117
oneof message {
118118
RpcRequest request = 1;
119119
RpcResponse response = 2;
120-
Event event = 3;
120+
cloudevent.CloudEvent cloudEvent = 3;
121121
RegisterAgentTypeRequest registerAgentTypeRequest = 4;
122122
RegisterAgentTypeResponse registerAgentTypeResponse = 5;
123123
AddSubscriptionRequest addSubscriptionRequest = 6;
124124
AddSubscriptionResponse addSubscriptionResponse = 7;
125-
cloudevent.CloudEvent cloudEvent = 8;
126125
}
127126
}
128127

‎python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py

+27-13
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
cast,
2929
)
3030

31+
from autogen_core.application.protos import cloudevent_pb2
3132
from opentelemetry.trace import TracerProvider
3233
from typing_extensions import Self, deprecated
3334

3435
from ..base import (
36+
PROTOBUF_DATA_CONTENT_TYPE,
3537
JSON_DATA_CONTENT_TYPE,
3638
Agent,
3739
AgentId,
@@ -178,6 +180,7 @@ def __init__(
178180
host_address: str,
179181
tracer_provider: TracerProvider | None = None,
180182
extra_grpc_config: ChannelArgumentType | None = None,
183+
payload_serialization_format: str = JSON_DATA_CONTENT_TYPE,
181184
) -> None:
182185
self._host_address = host_address
183186
self._trace_helper = TraceHelper(tracer_provider, MessageRuntimeTracingConfig("Worker Runtime"))
@@ -198,6 +201,11 @@ def __init__(
198201
self._serialization_registry = SerializationRegistry()
199202
self._extra_grpc_config = extra_grpc_config or []
200203

204+
if payload_serialization_format not in {JSON_DATA_CONTENT_TYPE, PROTOBUF_DATA_CONTENT_TYPE}:
205+
raise ValueError(f"Unsupported payload serialization format: {payload_serialization_format}")
206+
207+
self._payload_serialization_format = payload_serialization_format
208+
201209
def start(self) -> None:
202210
"""Start the runtime in a background task."""
203211
if self._running:
@@ -381,27 +389,33 @@ async def publish_message(
381389
if message_id is None:
382390
message_id = str(uuid.uuid4())
383391

384-
# TODO: consume message_id
385-
386392
message_type = self._serialization_registry.type_name(message)
387393
with self._trace_helper.trace_block(
388394
"create", topic_id, parent=None, extraAttributes={"message_type": message_type}
389395
):
390396
serialized_message = self._serialization_registry.serialize(
391-
message, type_name=message_type, data_content_type=JSON_DATA_CONTENT_TYPE
397+
message, type_name=message_type, data_content_type=self._payload_serialization_format
392398
)
399+
400+
sender_id = sender or AgentId("unknown", "unknown")
401+
attributes = {
402+
"datacontenttype": cloudevent_pb2.CloudEvent.CloudEventAttributeValue(ce_string=self._payload_serialization_format),
403+
"dataschema": cloudevent_pb2.CloudEvent.CloudEventAttributeValue(ce_string=message_type),
404+
"agagentsendertype": cloudevent_pb2.CloudEvent.CloudEventAttributeValue(ce_string=sender_id.type),
405+
"agagentsenderkey": cloudevent_pb2.CloudEvent.CloudEventAttributeValue(ce_string=sender_id.key),
406+
"agmsgkind": cloudevent_pb2.CloudEvent.CloudEventAttributeValue(ce_string="publish"),
407+
}
408+
393409
telemetry_metadata = get_telemetry_grpc_metadata()
394410
runtime_message = agent_worker_pb2.Message(
395-
event=agent_worker_pb2.Event(
396-
topic_type=topic_id.type,
397-
topic_source=topic_id.source,
398-
source=agent_worker_pb2.AgentId(type=sender.type, key=sender.key) if sender is not None else None,
399-
metadata=telemetry_metadata,
400-
payload=agent_worker_pb2.Payload(
401-
data_type=message_type,
402-
data=serialized_message,
403-
data_content_type=JSON_DATA_CONTENT_TYPE,
404-
),
411+
cloudEvent=cloudevent_pb2.CloudEvent(
412+
id=message_id,
413+
spec_version="1.0",
414+
type=topic_id.type,
415+
source=topic_id.source,
416+
attributes=attributes,
417+
# TODO: use text, or proto fields appropriately
418+
binary_data=serialized_message,
405419
)
406420
)
407421

‎python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi

+6-10
Original file line numberDiff line numberDiff line change
@@ -437,18 +437,17 @@ class Message(google.protobuf.message.Message):
437437

438438
REQUEST_FIELD_NUMBER: builtins.int
439439
RESPONSE_FIELD_NUMBER: builtins.int
440-
EVENT_FIELD_NUMBER: builtins.int
440+
CLOUDEVENT_FIELD_NUMBER: builtins.int
441441
REGISTERAGENTTYPEREQUEST_FIELD_NUMBER: builtins.int
442442
REGISTERAGENTTYPERESPONSE_FIELD_NUMBER: builtins.int
443443
ADDSUBSCRIPTIONREQUEST_FIELD_NUMBER: builtins.int
444444
ADDSUBSCRIPTIONRESPONSE_FIELD_NUMBER: builtins.int
445-
CLOUDEVENT_FIELD_NUMBER: builtins.int
446445
@property
447446
def request(self) -> global___RpcRequest: ...
448447
@property
449448
def response(self) -> global___RpcResponse: ...
450449
@property
451-
def event(self) -> global___Event: ...
450+
def cloudEvent(self) -> cloudevent_pb2.CloudEvent: ...
452451
@property
453452
def registerAgentTypeRequest(self) -> global___RegisterAgentTypeRequest: ...
454453
@property
@@ -457,22 +456,19 @@ class Message(google.protobuf.message.Message):
457456
def addSubscriptionRequest(self) -> global___AddSubscriptionRequest: ...
458457
@property
459458
def addSubscriptionResponse(self) -> global___AddSubscriptionResponse: ...
460-
@property
461-
def cloudEvent(self) -> cloudevent_pb2.CloudEvent: ...
462459
def __init__(
463460
self,
464461
*,
465462
request: global___RpcRequest | None = ...,
466463
response: global___RpcResponse | None = ...,
467-
event: global___Event | None = ...,
464+
cloudEvent: cloudevent_pb2.CloudEvent | None = ...,
468465
registerAgentTypeRequest: global___RegisterAgentTypeRequest | None = ...,
469466
registerAgentTypeResponse: global___RegisterAgentTypeResponse | None = ...,
470467
addSubscriptionRequest: global___AddSubscriptionRequest | None = ...,
471468
addSubscriptionResponse: global___AddSubscriptionResponse | None = ...,
472-
cloudEvent: cloudevent_pb2.CloudEvent | None = ...,
473469
) -> None: ...
474-
def HasField(self, field_name: typing.Literal["addSubscriptionRequest", b"addSubscriptionRequest", "addSubscriptionResponse", b"addSubscriptionResponse", "cloudEvent", b"cloudEvent", "event", b"event", "message", b"message", "registerAgentTypeRequest", b"registerAgentTypeRequest", "registerAgentTypeResponse", b"registerAgentTypeResponse", "request", b"request", "response", b"response"]) -> builtins.bool: ...
475-
def ClearField(self, field_name: typing.Literal["addSubscriptionRequest", b"addSubscriptionRequest", "addSubscriptionResponse", b"addSubscriptionResponse", "cloudEvent", b"cloudEvent", "event", b"event", "message", b"message", "registerAgentTypeRequest", b"registerAgentTypeRequest", "registerAgentTypeResponse", b"registerAgentTypeResponse", "request", b"request", "response", b"response"]) -> None: ...
476-
def WhichOneof(self, oneof_group: typing.Literal["message", b"message"]) -> typing.Literal["request", "response", "event", "registerAgentTypeRequest", "registerAgentTypeResponse", "addSubscriptionRequest", "addSubscriptionResponse", "cloudEvent"] | None: ...
470+
def HasField(self, field_name: typing.Literal["addSubscriptionRequest", b"addSubscriptionRequest", "addSubscriptionResponse", b"addSubscriptionResponse", "cloudEvent", b"cloudEvent", "message", b"message", "registerAgentTypeRequest", b"registerAgentTypeRequest", "registerAgentTypeResponse", b"registerAgentTypeResponse", "request", b"request", "response", b"response"]) -> builtins.bool: ...
471+
def ClearField(self, field_name: typing.Literal["addSubscriptionRequest", b"addSubscriptionRequest", "addSubscriptionResponse", b"addSubscriptionResponse", "cloudEvent", b"cloudEvent", "message", b"message", "registerAgentTypeRequest", b"registerAgentTypeRequest", "registerAgentTypeResponse", b"registerAgentTypeResponse", "request", b"request", "response", b"response"]) -> None: ...
472+
def WhichOneof(self, oneof_group: typing.Literal["message", b"message"]) -> typing.Literal["request", "response", "cloudEvent", "registerAgentTypeRequest", "registerAgentTypeResponse", "addSubscriptionRequest", "addSubscriptionResponse"] | None: ...
477473

478474
global___Message = Message

‎python/packages/autogen-core/src/autogen_core/base/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from ._message_context import MessageContext
1616
from ._message_handler_context import MessageHandlerContext
1717
from ._serialization import (
18+
PROTOBUF_DATA_CONTENT_TYPE,
1819
JSON_DATA_CONTENT_TYPE,
1920
MessageSerializer,
2021
SerializationRegistry,
@@ -43,6 +44,7 @@
4344
"SubscriptionInstantiationContext",
4445
"MessageHandlerContext",
4546
"JSON_DATA_CONTENT_TYPE",
47+
"PROTOBUF_DATA_CONTENT_TYPE",
4648
"MessageSerializer",
4749
"try_get_known_serializers_for_type",
4850
"UnknownPayload",

0 commit comments

Comments
 (0)