Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set contentType when invoke binding #1249

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void main(String[] args) throws Exception {

// get multiple states
Mono<List<State<MyClass>>> retrievedMessagesMono = client.getBulkState(STATE_STORE_NAME,
Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class);
Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class);
System.out.println("Retrieved messages using bulk get:");
retrievedMessagesMono.block().forEach(System.out::println);

Expand All @@ -114,11 +114,11 @@ public static void main(String[] args) throws Exception {
// Delete operation using transaction API
operationList.clear();
operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.DELETE,
new State<>(SECOND_KEY_NAME)));
new State<>(SECOND_KEY_NAME)));
client.executeStateTransaction(STATE_STORE_NAME, operationList).block();

Mono<List<State<MyClass>>> retrievedDeletedMessageMono = client.getBulkState(STATE_STORE_NAME,
Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class);
Arrays.asList(FIRST_KEY_NAME, SECOND_KEY_NAME), MyClass.class);
System.out.println("Trying to retrieve deleted states:");
retrievedDeletedMessageMono.block().forEach(System.out::println);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import io.dapr.client.ObjectSerializer;
import io.dapr.serializer.SerializedData;
import io.dapr.utils.DurationUtils;

import java.io.ByteArrayOutputStream;
Expand All @@ -37,28 +38,28 @@ public class ActorObjectSerializer extends ObjectSerializer {
* {@inheritDoc}
*/
@Override
public byte[] serialize(Object state) throws IOException {
public SerializedData serializeWithContentType(Object state) throws IOException {
if (state == null) {
return null;
}

if (state.getClass() == ActorTimerParams.class) {
// Special serializer for this internal classes.
return serialize((ActorTimerParams) state);
return new SerializedData(serialize((ActorTimerParams) state), "application/json");
}

if (state.getClass() == ActorReminderParams.class) {
// Special serializer for this internal classes.
return serialize((ActorReminderParams) state);
return new SerializedData(serialize((ActorReminderParams) state), "application/json");
}

if (state.getClass() == ActorRuntimeConfig.class) {
// Special serializer for this internal classes.
return serialize((ActorRuntimeConfig) state);
return new SerializedData(serialize((ActorRuntimeConfig) state), "application/json");
}

// Is not an special case.
return super.serialize(state);
return super.serializeWithContentType(state);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.serializer.SerializedData;
import io.dapr.utils.NetworkUtils;
import io.grpc.ManagedChannel;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -170,7 +171,7 @@ public ActorRuntimeConfig getConfig() {
* @throws IOException If cannot serialize config.
*/
public byte[] serializeConfig() throws IOException {
return INTERNAL_SERIALIZER.serialize(this.config);
return INTERNAL_SERIALIZER.serializeWithContentType(this.config).getData();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package io.dapr.actors.runtime;

import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.SerializedData;
import io.dapr.utils.TypeRef;

import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -31,12 +33,13 @@ public class JavaSerializer implements DaprObjectSerializer {
* {@inheritDoc}
*/
@Override
public byte[] serialize(Object o) throws IOException {
@Nonnull
public SerializedData serializeWithContentType(Object o) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(o);
oos.flush();
return bos.toByteArray();
return new SerializedData(bos.toByteArray(), "application/java");
}
}
}
Expand Down
19 changes: 11 additions & 8 deletions sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.SerializedData;
import io.dapr.utils.TypeRef;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -138,8 +140,9 @@ public void testBulkPublish() throws Exception {
60000));
DaprObjectSerializer serializer = new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(o);
@Nonnull
public SerializedData serializeWithContentType(Object o) throws JsonProcessingException {
return new SerializedData(OBJECT_MAPPER.writeValueAsBytes(o), getContentType());
}

@Override
Expand Down Expand Up @@ -266,9 +269,9 @@ public void testPubSub() throws Exception {
60000));

DaprObjectSerializer serializer = new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(o);
@Nonnull
public SerializedData serializeWithContentType(Object o) throws JsonProcessingException {
return new SerializedData(OBJECT_MAPPER.writeValueAsBytes(o), getContentType());
}

@Override
Expand Down Expand Up @@ -480,9 +483,9 @@ public void testPubSubBinary() throws Exception {
60000));

DaprObjectSerializer serializer = new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) {
return (byte[])o;
@Nonnull
public SerializedData serializeWithContentType(Object o) throws JsonProcessingException {
return new SerializedData((byte[])o, getContentType());
}

@Override
Expand Down
6 changes: 1 addition & 5 deletions sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,7 @@ public Mono<Void> saveState(String storeName, String key, Object value) {
*/
@Override
public Mono<Void> saveState(String storeName, String key, String etag, Object value, StateOptions options) {
Map<String, String> meta = null;
if (value != null) {
meta = Collections.singletonMap("contentType", stateSerializer.getContentType());
}
State<?> state = new State<>(key, value, etag, meta, options);
State<?> state = new State<>(key, value, etag, null, options);
return this.saveBulkState(storeName, Collections.singletonList(state));
}

Expand Down
40 changes: 27 additions & 13 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.serializer.SerializedData;
import io.dapr.utils.DefaultContentTypeConverter;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
Expand Down Expand Up @@ -309,16 +310,17 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
SerializedData serialized = objectSerializer.serializeWithContentType(data);
DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
.setData(ByteString.copyFrom(serialized.getData()));

// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
contentType = serialized.getContentType();
}
envelopeBuilder.setDataContentType(contentType);

Expand Down Expand Up @@ -367,11 +369,12 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
// perform the serialization as per user given input of serializer
// this is also the case when content type is empty

data = objectSerializer.serialize(event);
SerializedData serialized = objectSerializer.serializeWithContentType(event);
data = serialized.getData();

if (Strings.isNullOrEmpty(contentType)) {
// Only override content type if not given in input by user
contentType = objectSerializer.getContentType();
contentType = serialized.getContentType();
}
}
} catch (IOException ex) {
Expand Down Expand Up @@ -513,15 +516,15 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
if (metadata != null) {
headers.putAll(metadata);
}
byte[] serializedRequestBody = objectSerializer.serialize(request);
SerializedData serialized = objectSerializer.serializeWithContentType(request);
if (contentType != null && !contentType.isEmpty()) {
headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE, contentType);
} else {
headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE, objectSerializer.getContentType());
headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE, serialized.getContentType());
}
Mono<DaprHttp.Response> response = Mono.deferContextual(
context -> this.httpClient.invokeApi(httpMethod, pathSegments.toArray(new String[0]),
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
httpExtension.getQueryParams(), serialized.getData(), headers, context)
);
return response.flatMap(r -> getMonoForHttpResponse(type, r));
} catch (Exception ex) {
Expand Down Expand Up @@ -564,15 +567,21 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
throw new IllegalArgumentException("Binding operation cannot be null or empty.");
}

byte[] byteData = objectSerializer.serialize(data);
SerializedData serialized = objectSerializer.serializeWithContentType(data);
DaprProtos.InvokeBindingRequest.Builder builder = DaprProtos.InvokeBindingRequest.newBuilder()
.setName(name).setOperation(operation);
if (byteData != null) {
builder.setData(ByteString.copyFrom(byteData));
if (serialized.getData() != null) {
builder.setData(ByteString.copyFrom(serialized.getData()));
}
if (metadata != null) {
builder.putAllMetadata(metadata);
}
if (builder.getMetadataMap() == null || builder.getMetadataMap().get("contentType") == null) {
if (serialized.getContentType() != null && !serialized.getContentType().isEmpty()) {
builder.putMetadata("contentType", serialized.getContentType());
}
}

DaprProtos.InvokeBindingRequest envelope = builder.build();

Metadata responseMetadata = new Metadata();
Expand Down Expand Up @@ -812,7 +821,7 @@ public Mono<Void> saveBulkState(SaveStateRequest request) {
}

private <T> CommonProtos.StateItem.Builder buildStateRequest(State<T> state) throws IOException {
byte[] bytes = stateSerializer.serialize(state.getValue());
SerializedData serialized = stateSerializer.serializeWithContentType(state.getValue());

CommonProtos.StateItem.Builder stateBuilder = CommonProtos.StateItem.newBuilder();
if (state.getEtag() != null) {
Expand All @@ -821,8 +830,13 @@ private <T> CommonProtos.StateItem.Builder buildStateRequest(State<T> state) thr
if (state.getMetadata() != null) {
stateBuilder.putAllMetadata(state.getMetadata());
}
if (bytes != null) {
stateBuilder.setValue(ByteString.copyFrom(bytes));
if (serialized.getData() != null) {
stateBuilder.setValue(ByteString.copyFrom(serialized.getData()));
}
if (stateBuilder.getMetadataMap() == null || stateBuilder.getMetadataMap().get("contentType") == null) {
if (serialized.getContentType() != null && !serialized.getContentType().isEmpty()) {
stateBuilder.putMetadata("contentType", serialized.getContentType());
}
}
stateBuilder.setKey(state.getKey());
CommonProtos.StateOptions.Builder optionBuilder = null;
Expand Down
22 changes: 17 additions & 5 deletions sdk/src/main/java/io/dapr/client/ObjectSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.MessageLite;
import io.dapr.client.domain.CloudEvent;
import io.dapr.serializer.SerializedData;
import io.dapr.utils.TypeRef;

import java.io.IOException;
Expand Down Expand Up @@ -51,26 +52,37 @@ protected ObjectSerializer() {
* @throws IOException In case state cannot be serialized.
*/
public byte[] serialize(Object state) throws IOException {
return serializeWithContentType(state).getData();
}

/**
* Serializes a given state object into byte array and contentType.
*
* @param state State object to be serialized.
* @return Array of bytes[] with the serialized content.
* @throws IOException In case state cannot be serialized.
*/
public SerializedData serializeWithContentType(Object state) throws IOException {
if (state == null) {
return null;
return SerializedData.NULL;
}

if (state.getClass() == Void.class) {
return null;
return SerializedData.NULL;
}

// Have this check here to be consistent with deserialization (see deserialize() method below).
if (state instanceof byte[]) {
return (byte[]) state;
return new SerializedData((byte[]) state, "application/octet-stream");
}

// Proto buffer class is serialized directly.
if (state instanceof MessageLite) {
return ((MessageLite) state).toByteArray();
return new SerializedData(((MessageLite) state).toByteArray(), "application/protobuf");
}

// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.writeValueAsBytes(state);
return new SerializedData(OBJECT_MAPPER.writeValueAsBytes(state), "application/json");
}

/**
Expand Down
19 changes: 16 additions & 3 deletions sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.dapr.utils.TypeRef;

import javax.annotation.Nonnull;
import java.io.IOException;

/**
Expand All @@ -29,22 +30,34 @@ public interface DaprObjectSerializer {
* @return Serialized object.
* @throws IOException If cannot serialize.
*/
byte[] serialize(Object o) throws IOException;
default byte[] serialize(Object o) throws IOException {
return serializeWithContentType(o).getData();
}

/**
* Serializes the given object as byte[] and returns it as a {@link SerializedData} with contentType.
*
* @param obj Object to be serialized.
* @return Serialized object.
* @throws IOException If cannot serialize.
*/
@Nonnull
SerializedData serializeWithContentType(Object obj) throws IOException;

/**
* Deserializes the given byte[] into a object.
*
* @param data Data to be deserialized.
* @param type Type of object to be deserialized.
* @param <T> Type of object to be deserialized.
* @param <T> Type of object to be deserialized.
* @return Deserialized object.
* @throws IOException If cannot deserialize object.
*/
<T> T deserialize(byte[] data, TypeRef<T> type) throws IOException;

/**
* Returns the content type of the request.
*
*
* @return content type of the request
*/
String getContentType();
Expand Down
Loading
Loading