diff --git a/examples/src/main/java/io/dapr/examples/state/StateClient.java b/examples/src/main/java/io/dapr/examples/state/StateClient.java index 2363f96c03..cbf01e6d0f 100644 --- a/examples/src/main/java/io/dapr/examples/state/StateClient.java +++ b/examples/src/main/java/io/dapr/examples/state/StateClient.java @@ -88,7 +88,7 @@ public static void main(String[] args) throws Exception { // get multiple states Mono>> retrievedMessagesMono = client.getBulkState(STATE_STORE_NAME, - Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class); + Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class); System.out.println("Retrieved messages using bulk get:"); retrievedMessagesMono.block().forEach(System.out::println); @@ -114,11 +114,11 @@ public static void main(String[] args) throws Exception { // Delete operation using transaction API operationList.clear(); operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.DELETE, - new State<>(SECOND_KEY_NAME))); + new State<>(SECOND_KEY_NAME))); client.executeStateTransaction(STATE_STORE_NAME, operationList).block(); Mono>> retrievedDeletedMessageMono = client.getBulkState(STATE_STORE_NAME, - Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class); + Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class); System.out.println("Trying to retrieve deleted states:"); retrievedDeletedMessageMono.block().forEach(System.out::println); diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java index 131c3521c8..5aabe33439 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorObjectSerializer.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import io.dapr.client.ObjectSerializer; +import io.dapr.serializer.SerializedData; import io.dapr.utils.DurationUtils; import java.io.ByteArrayOutputStream; @@ -37,28 +38,28 @@ public class ActorObjectSerializer extends ObjectSerializer { * {@inheritDoc} */ @Override - public byte[] serialize(Object state) throws IOException { + public SerializedData serializeWithContentType(Object state) throws IOException { if (state == null) { return null; } if (state.getClass() == ActorTimerParams.class) { // Special serializer for this internal classes. - return serialize((ActorTimerParams) state); + return new SerializedData(serialize((ActorTimerParams) state), "application/json"); } if (state.getClass() == ActorReminderParams.class) { // Special serializer for this internal classes. - return serialize((ActorReminderParams) state); + return new SerializedData(serialize((ActorReminderParams) state), "application/json"); } if (state.getClass() == ActorRuntimeConfig.class) { // Special serializer for this internal classes. - return serialize((ActorRuntimeConfig) state); + return new SerializedData(serialize((ActorRuntimeConfig) state), "application/json"); } // Is not an special case. - return super.serialize(state); + return super.serializeWithContentType(state); } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index d329946e5a..9389a04bea 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -18,6 +18,7 @@ import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.serializer.SerializedData; import io.dapr.utils.NetworkUtils; import io.grpc.ManagedChannel; import reactor.core.publisher.Mono; @@ -170,7 +171,7 @@ public ActorRuntimeConfig getConfig() { * @throws IOException If cannot serialize config. */ public byte[] serializeConfig() throws IOException { - return INTERNAL_SERIALIZER.serialize(this.config); + return INTERNAL_SERIALIZER.serializeWithContentType(this.config).getData(); } /** diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java index 28c39e8b8d..6ee2cc53c6 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java @@ -14,8 +14,10 @@ package io.dapr.actors.runtime; import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.SerializedData; import io.dapr.utils.TypeRef; +import javax.annotation.Nonnull; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -31,12 +33,13 @@ public class JavaSerializer implements DaprObjectSerializer { * {@inheritDoc} */ @Override - public byte[] serialize(Object o) throws IOException { + @Nonnull + public SerializedData serializeWithContentType(Object o) throws IOException { try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { oos.writeObject(o); oos.flush(); - return bos.toByteArray(); + return new SerializedData(bos.toByteArray(), "application/java"); } } } diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index 57b9485e97..0a3b49ebc0 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -31,11 +31,13 @@ import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.SerializedData; import io.dapr.utils.TypeRef; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import javax.annotation.Nonnull; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -138,8 +140,9 @@ public void testBulkPublish() throws Exception { 60000)); DaprObjectSerializer serializer = new DaprObjectSerializer() { @Override - public byte[] serialize(Object o) throws JsonProcessingException { - return OBJECT_MAPPER.writeValueAsBytes(o); + @Nonnull + public SerializedData serializeWithContentType(Object o) throws JsonProcessingException { + return new SerializedData(OBJECT_MAPPER.writeValueAsBytes(o), getContentType()); } @Override @@ -266,9 +269,9 @@ public void testPubSub() throws Exception { 60000)); DaprObjectSerializer serializer = new DaprObjectSerializer() { - @Override - public byte[] serialize(Object o) throws JsonProcessingException { - return OBJECT_MAPPER.writeValueAsBytes(o); + @Nonnull + public SerializedData serializeWithContentType(Object o) throws JsonProcessingException { + return new SerializedData(OBJECT_MAPPER.writeValueAsBytes(o), getContentType()); } @Override @@ -480,9 +483,9 @@ public void testPubSubBinary() throws Exception { 60000)); DaprObjectSerializer serializer = new DaprObjectSerializer() { - @Override - public byte[] serialize(Object o) { - return (byte[])o; + @Nonnull + public SerializedData serializeWithContentType(Object o) throws JsonProcessingException { + return new SerializedData((byte[])o, getContentType()); } @Override diff --git a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java index 628a4c5ab9..11f74120be 100644 --- a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java +++ b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java @@ -501,11 +501,7 @@ public Mono saveState(String storeName, String key, Object value) { */ @Override public Mono saveState(String storeName, String key, String etag, Object value, StateOptions options) { - Map meta = null; - if (value != null) { - meta = Collections.singletonMap("contentType", stateSerializer.getContentType()); - } - State state = new State<>(key, value, etag, meta, options); + State state = new State<>(key, value, etag, null, options); return this.saveBulkState(storeName, Collections.singletonList(state)); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 0c1264eb19..5591059aff 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -63,6 +63,7 @@ import io.dapr.internal.resiliency.TimeoutPolicy; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.serializer.SerializedData; import io.dapr.utils.DefaultContentTypeConverter; import io.dapr.utils.TypeRef; import io.dapr.v1.CommonProtos; @@ -309,16 +310,17 @@ public Mono publishEvent(PublishEventRequest request) { String pubsubName = request.getPubsubName(); String topic = request.getTopic(); Object data = request.getData(); + SerializedData serialized = objectSerializer.serializeWithContentType(data); DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder() .setTopic(topic) .setPubsubName(pubsubName) - .setData(ByteString.copyFrom(objectSerializer.serialize(data))); + .setData(ByteString.copyFrom(serialized.getData())); // Content-type can be overwritten on a per-request basis. // It allows CloudEvents to be handled differently, for example. String contentType = request.getContentType(); if (contentType == null || contentType.isEmpty()) { - contentType = objectSerializer.getContentType(); + contentType = serialized.getContentType(); } envelopeBuilder.setDataContentType(contentType); @@ -367,11 +369,12 @@ public Mono> publishEvents(BulkPublishRequest requ // perform the serialization as per user given input of serializer // this is also the case when content type is empty - data = objectSerializer.serialize(event); + SerializedData serialized = objectSerializer.serializeWithContentType(event); + data = serialized.getData(); if (Strings.isNullOrEmpty(contentType)) { // Only override content type if not given in input by user - contentType = objectSerializer.getContentType(); + contentType = serialized.getContentType(); } } } catch (IOException ex) { @@ -513,15 +516,15 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef if (metadata != null) { headers.putAll(metadata); } - byte[] serializedRequestBody = objectSerializer.serialize(request); + SerializedData serialized = objectSerializer.serializeWithContentType(request); if (contentType != null && !contentType.isEmpty()) { headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE, contentType); } else { - headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE, objectSerializer.getContentType()); + headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE, serialized.getContentType()); } Mono response = Mono.deferContextual( context -> this.httpClient.invokeApi(httpMethod, pathSegments.toArray(new String[0]), - httpExtension.getQueryParams(), serializedRequestBody, headers, context) + httpExtension.getQueryParams(), serialized.getData(), headers, context) ); return response.flatMap(r -> getMonoForHttpResponse(type, r)); } catch (Exception ex) { @@ -564,15 +567,21 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) throw new IllegalArgumentException("Binding operation cannot be null or empty."); } - byte[] byteData = objectSerializer.serialize(data); + SerializedData serialized = objectSerializer.serializeWithContentType(data); DaprProtos.InvokeBindingRequest.Builder builder = DaprProtos.InvokeBindingRequest.newBuilder() .setName(name).setOperation(operation); - if (byteData != null) { - builder.setData(ByteString.copyFrom(byteData)); + if (serialized.getData() != null) { + builder.setData(ByteString.copyFrom(serialized.getData())); } if (metadata != null) { builder.putAllMetadata(metadata); } + if (builder.getMetadataMap() == null || builder.getMetadataMap().get("contentType") == null) { + if (serialized.getContentType() != null && !serialized.getContentType().isEmpty()) { + builder.putMetadata("contentType", serialized.getContentType()); + } + } + DaprProtos.InvokeBindingRequest envelope = builder.build(); Metadata responseMetadata = new Metadata(); @@ -812,7 +821,7 @@ public Mono saveBulkState(SaveStateRequest request) { } private CommonProtos.StateItem.Builder buildStateRequest(State state) throws IOException { - byte[] bytes = stateSerializer.serialize(state.getValue()); + SerializedData serialized = stateSerializer.serializeWithContentType(state.getValue()); CommonProtos.StateItem.Builder stateBuilder = CommonProtos.StateItem.newBuilder(); if (state.getEtag() != null) { @@ -821,8 +830,13 @@ private CommonProtos.StateItem.Builder buildStateRequest(State state) thr if (state.getMetadata() != null) { stateBuilder.putAllMetadata(state.getMetadata()); } - if (bytes != null) { - stateBuilder.setValue(ByteString.copyFrom(bytes)); + if (serialized.getData() != null) { + stateBuilder.setValue(ByteString.copyFrom(serialized.getData())); + } + if (stateBuilder.getMetadataMap() == null || stateBuilder.getMetadataMap().get("contentType") == null) { + if (serialized.getContentType() != null && !serialized.getContentType().isEmpty()) { + stateBuilder.putMetadata("contentType", serialized.getContentType()); + } } stateBuilder.setKey(state.getKey()); CommonProtos.StateOptions.Builder optionBuilder = null; diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index a131060b90..a2008c6aef 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.MessageLite; import io.dapr.client.domain.CloudEvent; +import io.dapr.serializer.SerializedData; import io.dapr.utils.TypeRef; import java.io.IOException; @@ -51,26 +52,37 @@ protected ObjectSerializer() { * @throws IOException In case state cannot be serialized. */ public byte[] serialize(Object state) throws IOException { + return serializeWithContentType(state).getData(); + } + + /** + * Serializes a given state object into byte array and contentType. + * + * @param state State object to be serialized. + * @return Array of bytes[] with the serialized content. + * @throws IOException In case state cannot be serialized. + */ + public SerializedData serializeWithContentType(Object state) throws IOException { if (state == null) { - return null; + return SerializedData.NULL; } if (state.getClass() == Void.class) { - return null; + return SerializedData.NULL; } // Have this check here to be consistent with deserialization (see deserialize() method below). if (state instanceof byte[]) { - return (byte[]) state; + return new SerializedData((byte[]) state, "application/octet-stream"); } // Proto buffer class is serialized directly. if (state instanceof MessageLite) { - return ((MessageLite) state).toByteArray(); + return new SerializedData(((MessageLite) state).toByteArray(), "application/protobuf"); } // Not string, not primitive, so it is a complex type: we use JSON for that. - return OBJECT_MAPPER.writeValueAsBytes(state); + return new SerializedData(OBJECT_MAPPER.writeValueAsBytes(state), "application/json"); } /** diff --git a/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java index 6cd1af65ca..bdac5e724b 100644 --- a/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java @@ -15,6 +15,7 @@ import io.dapr.utils.TypeRef; +import javax.annotation.Nonnull; import java.io.IOException; /** @@ -29,14 +30,26 @@ public interface DaprObjectSerializer { * @return Serialized object. * @throws IOException If cannot serialize. */ - byte[] serialize(Object o) throws IOException; + default byte[] serialize(Object o) throws IOException { + return serializeWithContentType(o).getData(); + } + + /** + * Serializes the given object as byte[] and returns it as a {@link SerializedData} with contentType. + * + * @param obj Object to be serialized. + * @return Serialized object. + * @throws IOException If cannot serialize. + */ + @Nonnull + SerializedData serializeWithContentType(Object obj) throws IOException; /** * Deserializes the given byte[] into a object. * * @param data Data to be deserialized. * @param type Type of object to be deserialized. - * @param Type of object to be deserialized. + * @param Type of object to be deserialized. * @return Deserialized object. * @throws IOException If cannot deserialize object. */ @@ -44,7 +57,7 @@ public interface DaprObjectSerializer { /** * Returns the content type of the request. - * + * * @return content type of the request */ String getContentType(); diff --git a/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java index 3527f2e7a5..c878b5fc27 100644 --- a/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java @@ -16,6 +16,7 @@ import io.dapr.client.ObjectSerializer; import io.dapr.utils.TypeRef; +import javax.annotation.Nonnull; import java.io.IOException; /** @@ -27,8 +28,9 @@ public class DefaultObjectSerializer extends ObjectSerializer implements DaprObj * {@inheritDoc} */ @Override - public byte[] serialize(Object o) throws IOException { - return super.serialize(o); + @Nonnull + public SerializedData serializeWithContentType(Object o) throws IOException { + return super.serializeWithContentType(o); } /** diff --git a/sdk/src/main/java/io/dapr/serializer/SerializedData.java b/sdk/src/main/java/io/dapr/serializer/SerializedData.java new file mode 100644 index 0000000000..33c2f91f11 --- /dev/null +++ b/sdk/src/main/java/io/dapr/serializer/SerializedData.java @@ -0,0 +1,32 @@ +package io.dapr.serializer; + +/** + * Represents a serialized data, with its content type. + */ +public class SerializedData { + + public static final SerializedData NULL = new SerializedData(null, null); + + /** + * The data has been be serialized. + */ + private final byte[] data; + + /** + * The content type of the serialized data. + */ + private final String contentType; + + public SerializedData(byte[] data, String contentType) { + this.data = data; + this.contentType = contentType; + } + + public byte[] getData() { + return data; + } + + public String getContentType() { + return contentType; + } +} diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 336f205353..dc84cb5a21 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -37,6 +37,7 @@ import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.serializer.SerializedData; import io.dapr.utils.TypeRef; import io.dapr.v1.CommonProtos; import io.dapr.v1.DaprGrpc; @@ -164,7 +165,7 @@ public void publishEventSerializeException() throws IOException { return null; }).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any()); - when(mockSerializer.serialize(any())).thenThrow(IOException.class); + when(mockSerializer.serializeWithContentType(any())).thenThrow(IOException.class); Mono result = client.publishEvent("pubsubname","topic", "{invalid-json"); assertThrowsDaprException( @@ -282,7 +283,7 @@ public void invokeBindingSerializeException() throws IOException { return null; }).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any()); - when(mockSerializer.serialize(any())).thenThrow(IOException.class); + when(mockSerializer.serializeWithContentType(any())).thenThrow(IOException.class); Mono result = client.invokeBinding("BindingName", "MyOperation", "request".getBytes(), Collections.EMPTY_MAP); assertThrowsDaprException( @@ -1031,7 +1032,7 @@ public void executeTransactionSerializerExceptionTest() throws IOException { }).when(daprStub).executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class), any()); - when(mockSerializer.serialize(any())).thenThrow(IOException.class); + when(mockSerializer.serializeWithContentType(any())).thenThrow(IOException.class); State stateKey = buildStateKey(data, key, etag, options); TransactionalStateOperation upsertOperation = new TransactionalStateOperation<>( TransactionalStateOperation.OperationType.UPSERT, diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index a28dad0f42..f329483e23 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -169,7 +169,7 @@ public void publishEventsSerializeException() throws IOException { "application/json", null); BulkPublishRequest> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME, Collections.singletonList(entry)); - when(mockSerializer.serialize(any())).thenThrow(IOException.class); + when(mockSerializer.serializeWithContentType(any())).thenThrow(IOException.class); Mono>> result = previewClient.publishEvents(req); assertThrowsDaprException(