From 929bbd0eb654c4108c8ec2cf07f3f54652454b28 Mon Sep 17 00:00:00 2001
From: Pierangelo Di Pilato
reply
Reply is the configuration on how to handle responses from Sink. +It can only be set if Sink is set.
+EventTransformations
reply
Reply is the configuration on how to handle responses from Sink. +It can only be set if Sink is set.
+EventTransformations
-(Appears on:EventTransformSpec) +(Appears on:EventTransformSpec, ReplySpec)
@@ -3595,6 +3625,59 @@ Kubernetes apps/v1.DeploymentStatus
+(Appears on:EventTransformSpec) +
++
ReplySpec is the configurations on how to handle responses from Sink.
+ +Field | +Description | +
---|---|
+EventTransformations + + +EventTransformations + + + |
+
+
+(Members of EventTransformations for replies from the Sink, contain all possible transformations, +only one “type” can be used. +The used type must match the top-level transformation, if you need to mix transformation types, +use compositions and chain transformations together to achieve your desired outcome. + |
+
+discard + +bool + + |
+
+(Optional)
+ Discard discards responses from Sink and return empty response body. +When set to false, it returns the exact sink response body. +When set to true, Discard is mutually exclusive with EventTransformations in the reply +section, it can either be discarded or transformed. +Default: false. + |
+
diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_types.go b/pkg/apis/eventing/v1alpha1/eventtransform_types.go index a7a5d13e46f..1680737bfdc 100644 --- a/pkg/apis/eventing/v1alpha1/eventtransform_types.go +++ b/pkg/apis/eventing/v1alpha1/eventtransform_types.go @@ -74,10 +74,37 @@ type EventTransformSpec struct { // +optional Sink *duckv1.Destination `json:"sink,omitempty"` + // Reply is the configuration on how to handle responses from Sink. + // It can only be set if Sink is set. + // + // +optional + Reply *ReplySpec `json:"reply,omitempty"` + // EventTransformations contain all possible transformations, only one "type" can be used. EventTransformations `json:",inline"` } +// ReplySpec is the configurations on how to handle responses from Sink. +type ReplySpec struct { + // EventTransformations for replies from the Sink, contain all possible transformations, + // only one "type" can be used. + // + // The used type must match the top-level transformation, if you need to mix transformation types, + // use compositions and chain transformations together to achieve your desired outcome. + EventTransformations `json:",inline"` + + // Discard discards responses from Sink and return empty response body. + // + // When set to false, it returns the exact sink response body. + // When set to true, Discard is mutually exclusive with EventTransformations in the reply + // section, it can either be discarded or transformed. + // + // Default: false. + // + // +optional + Discard *bool `json:"discard,omitempty"` +} + type EventTransformations struct { Jsonata *JsonataEventTransformationSpec `json:"jsonata,omitempty"` } diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_validation.go b/pkg/apis/eventing/v1alpha1/eventtransform_validation.go index 7954a42560e..b3521d5a64e 100644 --- a/pkg/apis/eventing/v1alpha1/eventtransform_validation.go +++ b/pkg/apis/eventing/v1alpha1/eventtransform_validation.go @@ -18,7 +18,10 @@ package v1alpha1 import ( "context" + "fmt" + "strings" + "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/apis" ) @@ -30,28 +33,82 @@ func (t *EventTransform) Validate(ctx context.Context) *apis.FieldError { var possibleTransformations = []string{"jsonata"} func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError { + errs := ts.EventTransformations.Validate(ctx /* allowEmpty */, false) + errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink")) + errs = errs.Also(ts.Reply.Validate(ctx, ts).ViaField("reply")) + + if apis.IsInUpdate(ctx) { + original := apis.GetBaseline(ctx).(*EventTransform) + errs = errs.Also(ts.CheckImmutableFields(ctx, original)) + } + + return errs +} + +func (ets EventTransformations) Validate(ctx context.Context, allowEmpty bool) *apis.FieldError { var errs *apis.FieldError + // Only one type of transformation is allowed. + // These are transformations field paths. + transformations := ets.transformations() + + if len(transformations) == 0 && !allowEmpty { + errs = apis.ErrMissingOneOf(possibleTransformations...) + } else if len(transformations) > 1 { + errs = apis.ErrMultipleOneOf(transformations...) + } + + errs = errs.Also(ets.Jsonata.Validate(ctx).ViaField("jsonata")) + + return errs +} + +func (ets EventTransformations) transformations() []string { // Only one type of transformation is allowed. // These are transformations field paths. transformations := make([]string, 0, 2) - if ts.EventTransformations.Jsonata != nil { + if ets.Jsonata != nil { transformations = append(transformations, "jsonata") } + return transformations +} - if len(transformations) == 0 { - errs = errs.Also(apis.ErrMissingOneOf(possibleTransformations...)) - } else if len(transformations) > 1 { - errs = errs.Also(apis.ErrMultipleOneOf(transformations...)) +func (rs *ReplySpec) Validate(ctx context.Context, ts *EventTransformSpec) *apis.FieldError { + if rs == nil { + return nil + } + if ts.Sink == nil { + return apis.ErrGeneric( + "reply is set without spec.sink", + "", + ) } - errs = errs.Also(ts.EventTransformations.Jsonata.Validate(ctx).ViaField("jsonata")) - errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink")) + errs := rs.EventTransformations.Validate(ctx /* allowEmpty */, true) - if apis.IsInUpdate(ctx) { - original := apis.GetBaseline(ctx).(*EventTransform) - errs = errs.Also(ts.CheckImmutableFields(ctx, original)) + baseTransformationsSet := sets.New(ts.EventTransformations.transformations()...) + replyTransformationsSet := sets.New(rs.EventTransformations.transformations()...) + transformationsIntersection := baseTransformationsSet.Intersection(replyTransformationsSet) + + replyTransformations := rs.EventTransformations.transformations() + + if rs.Discard != nil && *rs.Discard { + replyTransformations = append(replyTransformations, "discard") + } + if len(replyTransformations) > 1 { + errs = apis.ErrMultipleOneOf(replyTransformations...) + } else if replyTransformationsSet.Len() > 0 && + baseTransformationsSet.Len() > 0 && + transformationsIntersection.Len() != 1 { + errs = apis.ErrGeneric( + fmt.Sprintf( + "Reply transformation type must match the transformation type in the top-level spec. Top-level transformations: %#v, reply transformations: %#v", + strings.Join(baseTransformationsSet.UnsortedList(), ", "), + strings.Join(replyTransformationsSet.UnsortedList(), ", "), + ), + replyTransformationsSet.UnsortedList()..., + ) } return errs @@ -64,16 +121,31 @@ func (js *JsonataEventTransformationSpec) Validate(context.Context) *apis.FieldE return nil } -func (in *EventTransformSpec) CheckImmutableFields(_ context.Context, original *EventTransform) *apis.FieldError { +func (in *EventTransformSpec) CheckImmutableFields(ctx context.Context, original *EventTransform) *apis.FieldError { if original == nil { return nil } + errs := in.EventTransformations.CheckImmutableFields(ctx, original.Spec.EventTransformations) + errs = errs.Also(in.Reply.CheckImmutableFields(ctx, original.Spec.Reply).ViaField("reply")) + return errs +} + +func (ets EventTransformations) CheckImmutableFields(ctx context.Context, original EventTransformations) *apis.FieldError { var errs *apis.FieldError - if original.Spec.EventTransformations.Jsonata != nil && in.EventTransformations.Jsonata == nil { - errs = errs.Also(apis.ErrGeneric("transformations types are immutable, jsonata transformation cannot be changed to a different transformation type")) + const suggestion = "Suggestion: create a new transformation, migrate services to the new one, and delete this transformation." + + if ets.Jsonata != nil && original.Jsonata == nil { + errs = apis.ErrGeneric("Transformations types are immutable, jsonata transformation cannot be changed to a different transformation type. " + suggestion).ViaField("jsonata") + } else if original.Jsonata != nil && ets.Jsonata == nil { + errs = apis.ErrGeneric("Transformations types are immutable, transformation type cannot be changed to a jsonata transformation. " + suggestion).ViaField("jsonata") } return errs } + +func (rs *ReplySpec) CheckImmutableFields(_ context.Context, _ *ReplySpec) *apis.FieldError { + // ReplySpec is fully mutable. + return nil +} diff --git a/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go b/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go index 754ac87c684..ea365362329 100644 --- a/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/eventtransform_validation_test.go @@ -17,9 +17,14 @@ limitations under the License. package v1alpha1 import ( + "context" "testing" "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/ptr" "knative.dev/pkg/webhook/json" ) @@ -44,3 +49,327 @@ func TestJSONDecode(t *testing.T) { assert.Nil(t, err) } + +var sink = &duckv1.Destination{ + URI: apis.HTTP("example.com"), +} + +func TestEventTransform_Validate(t *testing.T) { + tests := []struct { + name string + in EventTransform + ctx context.Context + want *apis.FieldError + }{ + { + name: "empty", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{}, + Status: EventTransformStatus{}, + }, + ctx: context.Background(), + want: apis.ErrMissingOneOf("jsonata").ViaField("spec"), + }, + { + name: "jsonata valid", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + }, + Status: EventTransformStatus{}, + }, + ctx: context.Background(), + want: nil, + }, + { + name: "jsonata with reply valid", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + Sink: sink, + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + Reply: &ReplySpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + }, + }, + Status: EventTransformStatus{}, + }, + ctx: context.Background(), + want: nil, + }, + { + name: "jsonata with reply discard valid", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + Sink: sink, + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + Reply: &ReplySpec{ + Discard: ptr.Bool(true), + }, + }, + Status: EventTransformStatus{}, + }, + ctx: context.Background(), + want: nil, + }, + { + name: "jsonata with reply, jsonata reply transformations and discard = true, invalid", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + Sink: sink, + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + Reply: &ReplySpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + Discard: ptr.Bool(true), + }, + }, + }, + ctx: context.Background(), + want: (&apis.FieldError{}).Also(apis.ErrMultipleOneOf("jsonata", "discard").ViaField("reply").ViaField("spec")), + }, + { + name: "jsonata update change expression", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + Sink: sink, + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + Reply: &ReplySpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + }, + }, + Status: EventTransformStatus{}, + }, + ctx: apis.WithinUpdate(context.Background(), &EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + Reply: &ReplySpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "1.0" }`, + }, + }, + }, + }, + Status: EventTransformStatus{}, + }), + want: nil, + }, + { + name: "transform jsonata change transformation type, have -> not have", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + }, + Status: EventTransformStatus{}, + }, + ctx: apis.WithinUpdate(context.Background(), &EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + EventTransformations: EventTransformations{}, + }, + Status: EventTransformStatus{}, + }), + want: (&apis.FieldError{}). + Also( + apis.ErrGeneric("Transformations types are immutable, jsonata transformation cannot be changed to a different transformation type. Suggestion: create a new transformation, migrate services to the new one, and delete this transformation."). + ViaField("jsonata"), + ). + ViaField("spec"), + }, + { + name: "transform jsonata change reply transformation type, have -> not have", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + Sink: sink, + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + Reply: &ReplySpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + }, + }, + Status: EventTransformStatus{}, + }, + ctx: apis.WithinUpdate(context.Background(), &EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + }, + Status: EventTransformStatus{}, + }), + want: nil, + }, + { + name: "transform jsonata change reply transformation type, jsonata expression -> discard", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + Sink: sink, + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + Reply: &ReplySpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + }, + }, + Status: EventTransformStatus{}, + }, + ctx: apis.WithinUpdate(context.Background(), &EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + Reply: &ReplySpec{ + Discard: ptr.Bool(true), + }, + }, + Status: EventTransformStatus{}, + }), + want: nil, + }, + { + name: "reply without sink", + in: EventTransform{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + }, + Spec: EventTransformSpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + Reply: &ReplySpec{ + EventTransformations: EventTransformations{ + Jsonata: &JsonataEventTransformationSpec{ + Expression: `{ "specversion": "2.0" }`, + }, + }, + }, + }, + Status: EventTransformStatus{}, + }, + ctx: context.Background(), + want: (&apis.FieldError{}).Also( + apis.ErrGeneric("reply is set without spec.sink", ""). + ViaField("reply"). + ViaField("spec"), + ), + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got := tt.in.Validate(tt.ctx) + assert.Equalf(t, tt.want, tt.in.Validate(tt.ctx), "Validate(%v) = %v", tt.ctx, got) + }) + } +} diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 3df74897a33..03129c83eb8 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -328,6 +328,11 @@ func (in *EventTransformSpec) DeepCopyInto(out *EventTransformSpec) { *out = new(duckv1.Destination) (*in).DeepCopyInto(*out) } + if in.Reply != nil { + in, out := &in.Reply, &out.Reply + *out = new(ReplySpec) + (*in).DeepCopyInto(*out) + } in.EventTransformations.DeepCopyInto(&out.EventTransformations) return } @@ -419,6 +424,28 @@ func (in *JsonataEventTransformationStatus) DeepCopy() *JsonataEventTransformati return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReplySpec) DeepCopyInto(out *ReplySpec) { + *out = *in + in.EventTransformations.DeepCopyInto(&out.EventTransformations) + if in.Discard != nil { + in, out := &in.Discard, &out.Discard + *out = new(bool) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplySpec. +func (in *ReplySpec) DeepCopy() *ReplySpec { + if in == nil { + return nil + } + out := new(ReplySpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RequestReply) DeepCopyInto(out *RequestReply) { *out = *in diff --git a/pkg/reconciler/eventtransform/resources_jsonata.go b/pkg/reconciler/eventtransform/resources_jsonata.go index 9bb59435999..23974bd1cff 100644 --- a/pkg/reconciler/eventtransform/resources_jsonata.go +++ b/pkg/reconciler/eventtransform/resources_jsonata.go @@ -37,12 +37,14 @@ import ( ) const ( - JsonataResourcesLabelKey = "eventing.knative.dev/event-transform-jsonata" - JsonataResourcesLabelValue = "true" - JsonataExpressionHashKey = "eventing.knative.dev/event-transform-jsonata-expression-hash" - JsonataResourcesNameSuffix = "-jsonata" - JsonataExpressionDataKey = "jsonata-expression" - JsonataExpressionPath = "/etc/jsonata" + JsonataResourcesLabelKey = "eventing.knative.dev/event-transform-jsonata" + JsonataResourcesLabelValue = "true" + JsonataExpressionHashKey = "eventing.knative.dev/event-transform-jsonata-expression-hash" + JsonataResourcesNameSuffix = "-jsonata" + JsonataExpressionDataKey = "jsonata-expression" + JsonataReplyExpressionDataKey = "jsonata-expression-reply" + + JsonataExpressionPath = "/etc/jsonata" JsonataResourcesSelector = JsonataResourcesLabelKey + "=" + JsonataResourcesLabelValue ) @@ -61,6 +63,11 @@ func jsonataExpressionConfigMap(_ context.Context, transform *eventing.EventTran JsonataExpressionDataKey: transform.Spec.EventTransformations.Jsonata.Expression, }, } + + if transform.Spec.Reply != nil && transform.Spec.Reply.Jsonata != nil { + expression.Data[JsonataReplyExpressionDataKey] = transform.Spec.Reply.Jsonata.Expression + } + return expression } @@ -141,8 +148,39 @@ func jsonataDeployment(_ context.Context, expression *corev1.ConfigMap, transfor }, } - // When the expression changes, this rolls out a new deployment with the latest ConfigMap data. - hash := sha256.Sum256([]byte(transform.Spec.EventTransformations.Jsonata.Expression)) + // hashPayload is used to detect and roll out a new deployment when expressions change. + hashPayload := transform.Spec.EventTransformations.Jsonata.Expression + + if transform.Spec.Reply != nil { + if transform.Spec.Reply.Discard != nil { + if *transform.Spec.Reply.Discard { + d.Spec.Template.Spec.Containers[0].Env = append(d.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "JSONATA_DISCARD_RESPONSE_BODY", + Value: "true", + }) + } else { + d.Spec.Template.Spec.Containers[0].Env = append(d.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "JSONATA_DISCARD_RESPONSE_BODY", + Value: "false", + }) + } + } else { + d.Spec.Template.Spec.Containers[0].Env = append(d.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "JSONATA_DISCARD_RESPONSE_BODY", + Value: "false", + }) + } + + if transform.Spec.Reply.Jsonata != nil { + d.Spec.Template.Spec.Containers[0].Env = append(d.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "JSONATA_RESPONSE_TRANSFORM_FILE_NAME", + Value: filepath.Join(JsonataExpressionPath, JsonataReplyExpressionDataKey), + }) + hashPayload += transform.Spec.Reply.Jsonata.Expression + } + } + + hash := sha256.Sum256([]byte(hashPayload)) d.Annotations[JsonataExpressionHashKey] = base64.StdEncoding.EncodeToString(hash[:]) return d