Skip to content

Commit

Permalink
EventTransform: Support transforming response from Sink (#8469)
Browse files Browse the repository at this point in the history
This allows:
- propagating the response from Sink
- discarding the response from Sink
- transforming the response from Sink

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Feb 17, 2025
1 parent 75195a5 commit cee6e8c
Show file tree
Hide file tree
Showing 6 changed files with 598 additions and 22 deletions.
85 changes: 84 additions & 1 deletion docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2858,6 +2858,21 @@ back to the broker.</p>
</tr>
<tr>
<td>
<code>reply</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.ReplySpec">
ReplySpec
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Reply is the configuration on how to handle responses from Sink.
It can only be set if Sink is set.</p>
</td>
</tr>
<tr>
<td>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
Expand Down Expand Up @@ -3421,6 +3436,21 @@ back to the broker.</p>
</tr>
<tr>
<td>
<code>reply</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.ReplySpec">
ReplySpec
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Reply is the configuration on how to handle responses from Sink.
It can only be set if Sink is set.</p>
</td>
</tr>
<tr>
<td>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
Expand Down Expand Up @@ -3512,7 +3542,7 @@ JsonataEventTransformationStatus
<h3 id="eventing.knative.dev/v1alpha1.EventTransformations">EventTransformations
</h3>
<p>
(<em>Appears on:</em><a href="#eventing.knative.dev/v1alpha1.EventTransformSpec">EventTransformSpec</a>)
(<em>Appears on:</em><a href="#eventing.knative.dev/v1alpha1.EventTransformSpec">EventTransformSpec</a>, <a href="#eventing.knative.dev/v1alpha1.ReplySpec">ReplySpec</a>)
</p>
<p>
</p>
Expand Down Expand Up @@ -3595,6 +3625,59 @@ Kubernetes apps/v1.DeploymentStatus
</tr>
</tbody>
</table>
<h3 id="eventing.knative.dev/v1alpha1.ReplySpec">ReplySpec
</h3>
<p>
(<em>Appears on:</em><a href="#eventing.knative.dev/v1alpha1.EventTransformSpec">EventTransformSpec</a>)
</p>
<p>
<p>ReplySpec is the configurations on how to handle responses from Sink.</p>
</p>
<table>
<thead>
<tr>
<th>Field</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>EventTransformations</code><br/>
<em>
<a href="#eventing.knative.dev/v1alpha1.EventTransformations">
EventTransformations
</a>
</em>
</td>
<td>
<p>
(Members of <code>EventTransformations</code> are embedded into this type.)
</p>
<p>EventTransformations for replies from the Sink, contain all possible transformations,
only one &ldquo;type&rdquo; can be used.</p>
<p>The used type must match the top-level transformation, if you need to mix transformation types,
use compositions and chain transformations together to achieve your desired outcome.</p>
</td>
</tr>
<tr>
<td>
<code>discard</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>Discard discards responses from Sink and return empty response body.</p>
<p>When set to false, it returns the exact sink response body.
When set to true, Discard is mutually exclusive with EventTransformations in the reply
section, it can either be discarded or transformed.</p>
<p>Default: false.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="eventing.knative.dev/v1alpha1.RequestReplySpec">RequestReplySpec
</h3>
<p>
Expand Down
27 changes: 27 additions & 0 deletions pkg/apis/eventing/v1alpha1/eventtransform_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,37 @@ type EventTransformSpec struct {
// +optional
Sink *duckv1.Destination `json:"sink,omitempty"`

// Reply is the configuration on how to handle responses from Sink.
// It can only be set if Sink is set.
//
// +optional
Reply *ReplySpec `json:"reply,omitempty"`

// EventTransformations contain all possible transformations, only one "type" can be used.
EventTransformations `json:",inline"`
}

// ReplySpec is the configurations on how to handle responses from Sink.
type ReplySpec struct {
// EventTransformations for replies from the Sink, contain all possible transformations,
// only one "type" can be used.
//
// The used type must match the top-level transformation, if you need to mix transformation types,
// use compositions and chain transformations together to achieve your desired outcome.
EventTransformations `json:",inline"`

// Discard discards responses from Sink and return empty response body.
//
// When set to false, it returns the exact sink response body.
// When set to true, Discard is mutually exclusive with EventTransformations in the reply
// section, it can either be discarded or transformed.
//
// Default: false.
//
// +optional
Discard *bool `json:"discard,omitempty"`
}

type EventTransformations struct {
Jsonata *JsonataEventTransformationSpec `json:"jsonata,omitempty"`
}
Expand Down
98 changes: 85 additions & 13 deletions pkg/apis/eventing/v1alpha1/eventtransform_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package v1alpha1

import (
"context"
"fmt"
"strings"

"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/apis"
)

Expand All @@ -30,28 +33,82 @@ func (t *EventTransform) Validate(ctx context.Context) *apis.FieldError {
var possibleTransformations = []string{"jsonata"}

func (ts *EventTransformSpec) Validate(ctx context.Context) *apis.FieldError {
errs := ts.EventTransformations.Validate(ctx /* allowEmpty */, false)
errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink"))
errs = errs.Also(ts.Reply.Validate(ctx, ts).ViaField("reply"))

if apis.IsInUpdate(ctx) {
original := apis.GetBaseline(ctx).(*EventTransform)
errs = errs.Also(ts.CheckImmutableFields(ctx, original))
}

return errs
}

func (ets EventTransformations) Validate(ctx context.Context, allowEmpty bool) *apis.FieldError {
var errs *apis.FieldError

// Only one type of transformation is allowed.
// These are transformations field paths.
transformations := ets.transformations()

if len(transformations) == 0 && !allowEmpty {
errs = apis.ErrMissingOneOf(possibleTransformations...)
} else if len(transformations) > 1 {
errs = apis.ErrMultipleOneOf(transformations...)
}

errs = errs.Also(ets.Jsonata.Validate(ctx).ViaField("jsonata"))

return errs
}

func (ets EventTransformations) transformations() []string {
// Only one type of transformation is allowed.
// These are transformations field paths.
transformations := make([]string, 0, 2)

if ts.EventTransformations.Jsonata != nil {
if ets.Jsonata != nil {
transformations = append(transformations, "jsonata")
}
return transformations
}

if len(transformations) == 0 {
errs = errs.Also(apis.ErrMissingOneOf(possibleTransformations...))
} else if len(transformations) > 1 {
errs = errs.Also(apis.ErrMultipleOneOf(transformations...))
func (rs *ReplySpec) Validate(ctx context.Context, ts *EventTransformSpec) *apis.FieldError {
if rs == nil {
return nil
}
if ts.Sink == nil {
return apis.ErrGeneric(
"reply is set without spec.sink",
"",
)
}

errs = errs.Also(ts.EventTransformations.Jsonata.Validate(ctx).ViaField("jsonata"))
errs = errs.Also(ts.Sink.Validate(ctx).ViaField("sink"))
errs := rs.EventTransformations.Validate(ctx /* allowEmpty */, true)

if apis.IsInUpdate(ctx) {
original := apis.GetBaseline(ctx).(*EventTransform)
errs = errs.Also(ts.CheckImmutableFields(ctx, original))
baseTransformationsSet := sets.New(ts.EventTransformations.transformations()...)
replyTransformationsSet := sets.New(rs.EventTransformations.transformations()...)
transformationsIntersection := baseTransformationsSet.Intersection(replyTransformationsSet)

replyTransformations := rs.EventTransformations.transformations()

if rs.Discard != nil && *rs.Discard {
replyTransformations = append(replyTransformations, "discard")
}
if len(replyTransformations) > 1 {
errs = apis.ErrMultipleOneOf(replyTransformations...)
} else if replyTransformationsSet.Len() > 0 &&
baseTransformationsSet.Len() > 0 &&
transformationsIntersection.Len() != 1 {
errs = apis.ErrGeneric(
fmt.Sprintf(
"Reply transformation type must match the transformation type in the top-level spec. Top-level transformations: %#v, reply transformations: %#v",
strings.Join(baseTransformationsSet.UnsortedList(), ", "),
strings.Join(replyTransformationsSet.UnsortedList(), ", "),
),
replyTransformationsSet.UnsortedList()...,
)
}

return errs
Expand All @@ -64,16 +121,31 @@ func (js *JsonataEventTransformationSpec) Validate(context.Context) *apis.FieldE
return nil
}

func (in *EventTransformSpec) CheckImmutableFields(_ context.Context, original *EventTransform) *apis.FieldError {
func (in *EventTransformSpec) CheckImmutableFields(ctx context.Context, original *EventTransform) *apis.FieldError {
if original == nil {
return nil
}

errs := in.EventTransformations.CheckImmutableFields(ctx, original.Spec.EventTransformations)
errs = errs.Also(in.Reply.CheckImmutableFields(ctx, original.Spec.Reply).ViaField("reply"))
return errs
}

func (ets EventTransformations) CheckImmutableFields(ctx context.Context, original EventTransformations) *apis.FieldError {
var errs *apis.FieldError

if original.Spec.EventTransformations.Jsonata != nil && in.EventTransformations.Jsonata == nil {
errs = errs.Also(apis.ErrGeneric("transformations types are immutable, jsonata transformation cannot be changed to a different transformation type"))
const suggestion = "Suggestion: create a new transformation, migrate services to the new one, and delete this transformation."

if ets.Jsonata != nil && original.Jsonata == nil {
errs = apis.ErrGeneric("Transformations types are immutable, jsonata transformation cannot be changed to a different transformation type. " + suggestion).ViaField("jsonata")
} else if original.Jsonata != nil && ets.Jsonata == nil {
errs = apis.ErrGeneric("Transformations types are immutable, transformation type cannot be changed to a jsonata transformation. " + suggestion).ViaField("jsonata")
}

return errs
}

func (rs *ReplySpec) CheckImmutableFields(_ context.Context, _ *ReplySpec) *apis.FieldError {
// ReplySpec is fully mutable.
return nil
}
Loading

0 comments on commit cee6e8c

Please sign in to comment.