Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventTransform: Support transforming response from Sink #8469

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2858,6 +2858,21 @@ back to the broker.</p>
</tr>
<tr>
<td>
<code>reply</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.ReplySpec">
ReplySpec
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Reply is the configuration on how to handle responses from Sink.
It can only be set if Sink is set.</p>
</td>
</tr>
<tr>
<td>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
Expand Down Expand Up @@ -3421,6 +3436,21 @@ back to the broker.</p>
</tr>
<tr>
<td>
<code>reply</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.ReplySpec">
ReplySpec
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Reply is the configuration on how to handle responses from Sink.
It can only be set if Sink is set.</p>
</td>
</tr>
<tr>
<td>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
Expand Down Expand Up @@ -3512,7 +3542,7 @@ JsonataEventTransformationStatus
<h3 id="eventing.knative.dev/v1alpha1.EventTransformations">EventTransformations
</h3>
<p>
(<em>Appears on:</em><a href="#eventing.knative.dev/v1alpha1.EventTransformSpec">EventTransformSpec</a>)
(<em>Appears on:</em><a href="#eventing.knative.dev/v1alpha1.EventTransformSpec">EventTransformSpec</a>, <a href="#eventing.knative.dev/v1alpha1.ReplySpec">ReplySpec</a>)
</p>
<p>
</p>
Expand Down Expand Up @@ -3595,6 +3625,59 @@ Kubernetes apps/v1.DeploymentStatus
</tr>
</tbody>
</table>
<h3 id="eventing.knative.dev/v1alpha1.ReplySpec">ReplySpec
</h3>
<p>
(<em>Appears on:</em><a href="#eventing.knative.dev/v1alpha1.EventTransformSpec">EventTransformSpec</a>)
</p>
<p>
<p>ReplySpec is the configurations on how to handle responses from Sink.</p>
</p>
<table>
<thead>
<tr>
<th>Field</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
EventTransformations
</a>
</em>
</td>
<td>
<p>
(Members of <code>EventTransformations</code> are embedded into this type.)
</p>
<p>EventTransformations for replies from the Sink, contain all possible transformations,
only one &ldquo;type&rdquo; can be used.</p>
<p>The used type must match the top-level transformation, if you need to mix transformation types,
use compositions and chain transformations together to achieve your desired outcome.</p>
</td>
</tr>
<tr>
<td>
<code>discard</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>Discard discards responses from Sink and return empty response body.</p>
<p>When set to false, it returns the exact sink response body.
When set to true, Discard is mutually exclusive with EventTransformations in the reply
section, it can either be discarded or transformed.</p>
<p>Default: false.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="eventing.knative.dev/v1alpha1.RequestReplySpec">RequestReplySpec
</h3>
<p>
Expand Down
27 changes: 27 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventtransform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,37 @@ type EventTransformSpec struct {
// +optional
Sink *duckv1.Destination `json:"sink,omitempty"`

// Reply is the configuration on how to handle responses from Sink.
// It can only be set if Sink is set.
//
// +optional
Reply *ReplySpec `json:"reply,omitempty"`

// EventTransformations contain all possible transformations, only one "type" can be used.
EventTransformations `json:",inline"`
}

// ReplySpec is the configurations on how to handle responses from Sink.
type ReplySpec struct {
// EventTransformations for replies from the Sink, contain all possible transformations,
// only one "type" can be used.
//
// The used type must match the top-level transformation, if you need to mix transformation types,
// use compositions and chain transformations together to achieve your desired outcome.
EventTransformations `json:",inline"`

// Discard discards responses from Sink and return empty response body.
//
// When set to false, it returns the exact sink response body.
// When set to true, Discard is mutually exclusive with EventTransformations in the reply
// section, it can either be discarded or transformed.
//
// Default: false.
//
// +optional
Discard *bool `json:"discard,omitempty"`
}

type EventTransformations struct {
Jsonata *JsonataEventTransformationSpec `json:"jsonata,omitempty"`
}
Expand Down
98 changes: 85 additions & 13 deletions pkg/apis/eventing/v1alpha1/eventtransform_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package v1alpha1

import (
"context"
"fmt"
"strings"

"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/apis"
)

Expand All @@ -30,28 +33,82 @@ func (t *EventTransform) Validate(ctx context.Context) *apis.FieldError {
var possibleTransformations = []string{"jsonata"}

func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError {
errs := ts.EventTransformations.Validate(ctx /* allowEmpty */, false)
errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink"))
errs = errs.Also(ts.Reply.Validate(ctx, ts).ViaField("reply"))

if apis.IsInUpdate(ctx) {
original := apis.GetBaseline(ctx).(*EventTransform)
errs = errs.Also(ts.CheckImmutableFields(ctx, original))
}

return errs
}

func (ets EventTransformations) Validate(ctx context.Context, allowEmpty bool) *apis.FieldError {
var errs *apis.FieldError

// Only one type of transformation is allowed.
// These are transformations field paths.
transformations := ets.transformations()

if len(transformations) == 0 && !allowEmpty {
errs = apis.ErrMissingOneOf(possibleTransformations...)
} else if len(transformations) > 1 {
errs = apis.ErrMultipleOneOf(transformations...)
}

errs = errs.Also(ets.Jsonata.Validate(ctx).ViaField("jsonata"))

return errs
}

func (ets EventTransformations) transformations() []string {
// Only one type of transformation is allowed.
// These are transformations field paths.
transformations := make([]string, 0, 2)

if ts.EventTransformations.Jsonata != nil {
if ets.Jsonata != nil {
transformations = append(transformations, "jsonata")
}
return transformations
}

if len(transformations) == 0 {
errs = errs.Also(apis.ErrMissingOneOf(possibleTransformations...))
} else if len(transformations) > 1 {
errs = errs.Also(apis.ErrMultipleOneOf(transformations...))
func (rs *ReplySpec) Validate(ctx context.Context, ts *EventTransformSpec) *apis.FieldError {
if rs == nil {
return nil
}
if ts.Sink == nil {
return apis.ErrGeneric(
"reply is set without spec.sink",
"",
)
}

errs = errs.Also(ts.EventTransformations.Jsonata.Validate(ctx).ViaField("jsonata"))
errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink"))
errs := rs.EventTransformations.Validate(ctx /* allowEmpty */, true)

if apis.IsInUpdate(ctx) {
original := apis.GetBaseline(ctx).(*EventTransform)
errs = errs.Also(ts.CheckImmutableFields(ctx, original))
baseTransformationsSet := sets.New(ts.EventTransformations.transformations()...)
replyTransformationsSet := sets.New(rs.EventTransformations.transformations()...)
transformationsIntersection := baseTransformationsSet.Intersection(replyTransformationsSet)

replyTransformations := rs.EventTransformations.transformations()

if rs.Discard != nil && *rs.Discard {
replyTransformations = append(replyTransformations, "discard")
}
if len(replyTransformations) > 1 {
errs = apis.ErrMultipleOneOf(replyTransformations...)
} else if replyTransformationsSet.Len() > 0 &&
baseTransformationsSet.Len() > 0 &&
transformationsIntersection.Len() != 1 {
errs = apis.ErrGeneric(
fmt.Sprintf(
"Reply transformation type must match the transformation type in the top-level spec. Top-level transformations: %#v, reply transformations: %#v",
strings.Join(baseTransformationsSet.UnsortedList(), ", "),
strings.Join(replyTransformationsSet.UnsortedList(), ", "),
),
replyTransformationsSet.UnsortedList()...,
)
}

return errs
Expand All @@ -64,16 +121,31 @@ func (js *JsonataEventTransformationSpec) Validate(context.Context) *apis.FieldE
return nil
}

func (in *EventTransformSpec) CheckImmutableFields(_ context.Context, original *EventTransform) *apis.FieldError {
func (in *EventTransformSpec) CheckImmutableFields(ctx context.Context, original *EventTransform) *apis.FieldError {
if original == nil {
return nil
}

errs := in.EventTransformations.CheckImmutableFields(ctx, original.Spec.EventTransformations)
errs = errs.Also(in.Reply.CheckImmutableFields(ctx, original.Spec.Reply).ViaField("reply"))
return errs
}

func (ets EventTransformations) CheckImmutableFields(ctx context.Context, original EventTransformations) *apis.FieldError {
var errs *apis.FieldError

if original.Spec.EventTransformations.Jsonata != nil && in.EventTransformations.Jsonata == nil {
errs = errs.Also(apis.ErrGeneric("transformations types are immutable, jsonata transformation cannot be changed to a different transformation type"))
const suggestion = "Suggestion: create a new transformation, migrate services to the new one, and delete this transformation."

if ets.Jsonata != nil && original.Jsonata == nil {
errs = apis.ErrGeneric("Transformations types are immutable, jsonata transformation cannot be changed to a different transformation type. " + suggestion).ViaField("jsonata")
} else if original.Jsonata != nil && ets.Jsonata == nil {
errs = apis.ErrGeneric("Transformations types are immutable, transformation type cannot be changed to a jsonata transformation. " + suggestion).ViaField("jsonata")
}

return errs
}

func (rs *ReplySpec) CheckImmutableFields(_ context.Context, _ *ReplySpec) *apis.FieldError {
// ReplySpec is fully mutable.
return nil
}
Loading
Loading