From 57ece4c8a316d14b34c8a5cc54c7d9d8d53eb690 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 12 Feb 2025 10:14:14 +0100 Subject: [PATCH 1/7] Add EventTransform Jsonata reconciler Signed-off-by: Pierangelo Di Pilato --- pkg/reconciler/eventtransform/controller.go | 101 ++++++++ .../eventtransform/eventtransform.go | 224 ++++++++++++++++++ .../eventtransform/resources_jsonata.go | 176 ++++++++++++++ .../apps/v1/deployment/filtered/deployment.go | 65 +++++ vendor/modules.txt | 1 + 5 files changed, 567 insertions(+) create mode 100644 pkg/reconciler/eventtransform/controller.go create mode 100644 pkg/reconciler/eventtransform/eventtransform.go create mode 100644 pkg/reconciler/eventtransform/resources_jsonata.go create mode 100644 vendor/knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered/deployment.go diff --git a/pkg/reconciler/eventtransform/controller.go b/pkg/reconciler/eventtransform/controller.go new file mode 100644 index 00000000000..08da1a2db49 --- /dev/null +++ b/pkg/reconciler/eventtransform/controller.go @@ -0,0 +1,101 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtransform + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/types" + kubeclient "knative.dev/pkg/client/injection/kube/client" + deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered" + configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/logging" + + "knative.dev/eventing/pkg/apis/feature" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + eventtransformeryinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtransform" + sinkbindinginformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1/sinkbinding/filtered" + "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1alpha1/eventtransform" +) + +const ( + NameLabelKey = "eventing.knative.dev/event-transform-name" +) + +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + + eventTransformInformer := eventtransformeryinformer.Get(ctx) + jsonataConfigMapInformer := configmapinformer.Get(ctx, fmt.Sprintf("%s=%s", JsonataResourcesLabelKey, JsonataResourcesLabelValue)) + jsonataDeploymentInformer := deploymentinformer.Get(ctx, fmt.Sprintf("%s=%s", JsonataResourcesLabelKey, JsonataResourcesLabelValue)) + jsonataSinkBindingInformer := sinkbindinginformer.Get(ctx, fmt.Sprintf("%s=%s", JsonataResourcesLabelKey, JsonataResourcesLabelValue)) + + var globalResync func() + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync() + } + }) + featureStore.WatchConfigs(cmw) + + r := &Reconciler{ + k8s: kubeclient.Get(ctx), + client: eventingclient.Get(ctx), + jsonataConfigMapLister: jsonataConfigMapInformer.Lister(), + jsonataDeploymentsLister: jsonataDeploymentInformer.Lister(), + jsonataSinkBindingLister: jsonataSinkBindingInformer.Lister(), + } + + impl := eventtransform.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) + + globalResync = func() { + impl.GlobalResync(eventTransformInformer.Informer()) + } + + eventTransformInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + jsonataDeploymentInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + + return impl +} + +func enqueueUsingNameLabel(impl *controller.Impl) func(obj interface{}) { + return func(obj interface{}) { + acc, err := kmeta.DeletionHandlingAccessor(obj) + if err != nil { + return + } + name, ok := acc.GetLabels()[NameLabelKey] + if !ok { + return + } + impl.EnqueueKey(types.NamespacedName{Namespace: acc.GetNamespace(), Name: name}) + } +} diff --git a/pkg/reconciler/eventtransform/eventtransform.go b/pkg/reconciler/eventtransform/eventtransform.go new file mode 100644 index 00000000000..2500cdd88c4 --- /dev/null +++ b/pkg/reconciler/eventtransform/eventtransform.go @@ -0,0 +1,224 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtransform + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + appslister "k8s.io/client-go/listers/apps/v1" + corelister "k8s.io/client-go/listers/core/v1" + "knative.dev/pkg/controller" + "knative.dev/pkg/reconciler" + + eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + sources "knative.dev/eventing/pkg/apis/sources/v1" + eventingclient "knative.dev/eventing/pkg/client/clientset/versioned" + sourceslisters "knative.dev/eventing/pkg/client/listers/sources/v1" +) + +type Reconciler struct { + k8s kubernetes.Interface + client eventingclient.Interface + + jsonataConfigMapLister corelister.ConfigMapLister + jsonataDeploymentsLister appslister.DeploymentLister + jsonataSinkBindingLister sourceslisters.SinkBindingLister +} + +func (r *Reconciler) ReconcileKind(ctx context.Context, transform *eventing.EventTransform) reconciler.Event { + if err := r.reconcileJsonataTransformation(ctx, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation: %w", err) + } + return nil +} + +func (r *Reconciler) reconcileJsonataTransformation(ctx context.Context, transform *eventing.EventTransform) error { + if transform.Spec.Transformations.Jsonata == nil { + return nil + } + + expressionCm, err := r.reconcileJsonataTransformationConfigMap(ctx, transform) + if err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err) + } + if err := r.reconcileJsonataTransformationDeployment(ctx, expressionCm, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err) + } + if err := r.reconcileJsonataTransformationSinkBinding(ctx, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation sink binding: %w", err) + } + + return nil +} + +func (r *Reconciler) reconcileJsonataTransformationConfigMap(ctx context.Context, transform *eventing.EventTransform) (*corev1.ConfigMap, error) { + expected := jsonataExpressionConfigMap(ctx, transform) + + curr, err := r.jsonataConfigMapLister.ConfigMaps(expected.GetNamespace()).Get(expected.GetName()) + if apierrors.IsNotFound(err) { + return r.createConfigMap(ctx, transform, expected) + } + if err != nil { + return nil, fmt.Errorf("failed to get configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + if equality.Semantic.DeepDerivative(expected, curr) { + return curr, nil + } + expected.ResourceVersion = curr.ResourceVersion + return r.updateConfigMap(ctx, transform, expected) +} + +func (r *Reconciler) reconcileJsonataTransformationDeployment(ctx context.Context, expression *corev1.ConfigMap, transform *eventing.EventTransform) error { + expected := jsonataDeployment(ctx, expression, transform) + + curr, err := r.jsonataDeploymentsLister.Deployments(expected.GetNamespace()).Get(expected.GetName()) + if apierrors.IsNotFound(err) { + created, err := r.createDeployment(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataDeploymentStatus(created.Status) + return nil + } + if err != nil { + return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + if equality.Semantic.DeepDerivative(expected, curr) { + transform.Status.PropagateJsonataDeploymentStatus(curr.Status) + return nil + } + expected.ResourceVersion = curr.ResourceVersion + updated, err := r.updateDeployment(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataDeploymentStatus(updated.Status) + return nil +} + +func (r *Reconciler) reconcileJsonataTransformationSinkBinding(ctx context.Context, transform *eventing.EventTransform) error { + if transform.Spec.Sink == nil { + transform.Status.PropagateJsonataSinkBindingUnset() + return r.deleteJsonataTransformationSinkBinding(ctx, transform) + } + + expected := jsonataSinkBinding(ctx, transform) + curr, err := r.jsonataSinkBindingLister.SinkBindings(expected.GetNamespace()).Get(expected.GetName()) + if apierrors.IsNotFound(err) { + created, err := r.createSinkBinding(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataSinkBindingStatus(created.Status) + return nil + } + if err != nil { + return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + if equality.Semantic.DeepDerivative(expected, curr) { + transform.Status.PropagateJsonataSinkBindingStatus(curr.Status) + return nil + } + expected.ResourceVersion = curr.ResourceVersion + updated, err := r.updateSinkBinding(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataSinkBindingStatus(updated.Status) + return nil +} + +func (r *Reconciler) createDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) { + created, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentCreated", "", expected.GetName()) + return created, nil +} + +func (r *Reconciler) createConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) { + created, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataConfigMapCreated", "", expected.GetName()) + return created, nil +} + +func (r *Reconciler) updateDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) { + updated, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to update deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentUpdated", "", expected.GetName()) + return updated, nil +} + +func (r *Reconciler) updateConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) { + updated, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to update configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "ConfigMapUpdated", "", expected.GetName()) + return updated, nil +} + +func (r *Reconciler) deleteJsonataTransformationSinkBinding(ctx context.Context, transform *eventing.EventTransform) error { + sbName := jsonataSinkBindingName(transform) + _, err := r.jsonataSinkBindingLister.SinkBindings(transform.GetNamespace()).Get(sbName) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("failed to get sink binding %s/%s: %w", transform.GetNamespace(), sbName, err) + } + + err = r.client.SourcesV1().SinkBindings(transform.GetNamespace()).Delete(ctx, sbName, metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("failed to delete sink binding %s/%s: %w", transform.GetNamespace(), sbName, err) + } + return nil +} + +func (r *Reconciler) createSinkBinding(ctx context.Context, transform *eventing.EventTransform, expected sources.SinkBinding) (*sources.SinkBinding, error) { + created, err := r.client.SourcesV1().SinkBindings(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingCreated", "", expected.GetName()) + return created, nil +} + +func (r *Reconciler) updateSinkBinding(ctx context.Context, transform *eventing.EventTransform, expected sources.SinkBinding) (*sources.SinkBinding, error) { + updated, err := r.client.SourcesV1().SinkBindings(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to update sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingUpdated", "", expected.GetName()) + return updated, nil +} diff --git a/pkg/reconciler/eventtransform/resources_jsonata.go b/pkg/reconciler/eventtransform/resources_jsonata.go new file mode 100644 index 00000000000..2277124ab28 --- /dev/null +++ b/pkg/reconciler/eventtransform/resources_jsonata.go @@ -0,0 +1,176 @@ +package eventtransform + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "os" + "path/filepath" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/ptr" + "knative.dev/pkg/tracker" + + eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" +) + +const ( + JsonataResourcesLabelKey = "eventing.knative.dev/event-transform-jsonata" + JsonataResourcesLabelValue = "true" + JsonataExpressionHashKey = "eventing.knative.dev/event-transform-jsonata-expression-hash" + JsonataResourcesNameSuffix = "jsonata" + JsonataExpressionDataKey = "jsonata-expression" + JsonataExpressionPath = "/etc/jsonata" +) + +func jsonataExpressionConfigMap(_ context.Context, transform *eventing.EventTransform) corev1.ConfigMap { + expression := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), + Annotations: transform.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(transform), + }, + }, + Data: map[string]string{ + JsonataExpressionDataKey: transform.Spec.Transformations.Jsonata.Expression, + }, + } + return expression +} + +func jsonataDeployment(_ context.Context, expression *corev1.ConfigMap, transform *eventing.EventTransform) appsv1.Deployment { + image := os.Getenv("EVENT_TRANSFORM_JSONATA_IMAGE") + if image == "" { + panic("EVENT_TRANSFORM_JSONATA_IMAGE must be set") + } + + d := appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), + Annotations: transform.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(transform), + }, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + JsonataResourcesLabelKey: JsonataResourcesLabelValue, + NameLabelKey: transform.GetName(), + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + JsonataResourcesLabelKey: JsonataResourcesLabelValue, + NameLabelKey: transform.GetName(), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "jsonata-event-transform", + Image: image, + Env: []corev1.EnvVar{ + { + Name: "JSONATA_TRANSFORM_FILE_NAME", + Value: filepath.Join(JsonataExpressionPath, JsonataExpressionDataKey), + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: expression.GetName(), + ReadOnly: true, + MountPath: JsonataExpressionPath, + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: expression.GetName(), + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: expression.GetName(), + }, + Optional: ptr.Bool(false), + }, + }, + }, + }, + }, + }, + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxUnavailable: &intstr.IntOrString{ + Type: intstr.Int, + IntVal: 0, + }, + }, + }, + }, + } + + // When the expression changes, this rolls out a new deployment with the latest ConfigMap data. + hash := sha256.Sum256([]byte(transform.Spec.Transformations.Jsonata.Expression)) + d.Annotations[JsonataExpressionHashKey] = base64.StdEncoding.EncodeToString(hash[:]) + + return d +} + +func jsonataSinkBindingName(transform *eventing.EventTransform) string { + return kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix) +} + +func jsonataSinkBinding(_ context.Context, transform *eventing.EventTransform) sourcesv1.SinkBinding { + name := jsonataSinkBindingName(transform) + sb := sourcesv1.SinkBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), + Annotations: transform.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(transform), + }, + }, + Spec: sourcesv1.SinkBindingSpec{ + SourceSpec: duckv1.SourceSpec{ + Sink: *transform.Spec.Sink.DeepCopy(), + }, + BindingSpec: duckv1.BindingSpec{ + Subject: tracker.Reference{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", + Namespace: transform.GetNamespace(), + Name: name, + }, + }, + }, + } + + return sb +} + +func jsonataLabels(transform *eventing.EventTransform) map[string]string { + labels := make(map[string]string, len(transform.Labels)+2) + for k, v := range transform.Labels { + labels[k] = v + } + labels[JsonataResourcesLabelKey] = JsonataResourcesLabelValue + labels[NameLabelKey] = transform.GetName() + return labels +} diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered/deployment.go b/vendor/knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered/deployment.go new file mode 100644 index 00000000000..89de6fb7e46 --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered/deployment.go @@ -0,0 +1,65 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package filtered + +import ( + context "context" + + v1 "k8s.io/client-go/informers/apps/v1" + filtered "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterFilteredInformers(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct { + Selector string +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(filtered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := filtered.Get(ctx, selector) + inf := f.Apps().V1().Deployments() + ctx = context.WithValue(ctx, Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context, selector string) v1.DeploymentInformer { + untyped := ctx.Value(Key{Selector: selector}) + if untyped == nil { + logging.FromContext(ctx).Panicf( + "Unable to fetch k8s.io/client-go/informers/apps/v1.DeploymentInformer with selector %s from context.", selector) + } + return untyped.(v1.DeploymentInformer) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9c9716a8a33..b78b98c46e1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1158,6 +1158,7 @@ knative.dev/pkg/client/injection/kube/informers/admissionregistration/v1/mutatin knative.dev/pkg/client/injection/kube/informers/admissionregistration/v1/validatingwebhookconfiguration knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake +knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake knative.dev/pkg/client/injection/kube/informers/batch/v1/job/filtered From 4873ac25cd07dcbd25aaa2f1da98536e0b312340 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 12 Feb 2025 10:34:46 +0100 Subject: [PATCH 2/7] Add CRD Signed-off-by: Pierangelo Di Pilato --- cmd/schema/main.go | 1 + config/core/resources/eventtransform.yaml | 259 ++++++++++++++++++++++ 2 files changed, 260 insertions(+) create mode 100644 config/core/resources/eventtransform.yaml diff --git a/cmd/schema/main.go b/cmd/schema/main.go index fed2f8f8d92..6bb7c794a1d 100644 --- a/cmd/schema/main.go +++ b/cmd/schema/main.go @@ -49,6 +49,7 @@ func main() { registry.Register(&flowsv1.Sequence{}) registry.Register(&flowsv1.Parallel{}) registry.Register(&eventingv1alpha1.EventPolicy{}) + registry.Register(&eventingv1alpha1.EventTransform{}) if err := commands.New("knative.dev/eventing").Execute(); err != nil { log.Fatal("Error during command execution: ", err) diff --git a/config/core/resources/eventtransform.yaml b/config/core/resources/eventtransform.yaml new file mode 100644 index 00000000000..19bce65c3d4 --- /dev/null +++ b/config/core/resources/eventtransform.yaml @@ -0,0 +1,259 @@ +# Copyright 2025 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: eventtransforms.eventing.knative.dev + labels: + knative.dev/crd-install: "true" + app.kubernetes.io/version: devel + app.kubernetes.io/name: knative-eventing +spec: + group: eventing.knative.dev + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: { } + schema: + openAPIV3Schema: + type: object + properties: + spec: + description: Spec defines the desired state of the EventTransform. + type: object + properties: + jsonata: + type: object + properties: + expression: + description: Expression is the JSONata expression. + type: string + sink: + description: 'Sink is a reference to an object that will resolve to a uri to use as the sink. If not present, the transformation will send back the transformed event as response, this is useful to leverage the built-in Broker reply feature to re-publish a transformed event back to the broker. ' + type: object + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any. + type: string + audience: + description: Audience is the OIDC audience. This need only be set, if the target is not an Addressable and thus the Audience can't be received from the Addressable itself. In case the Addressable specifies an Audience too, the Destinations Audience takes preference. + type: string + ref: + description: Ref points to an Addressable. + type: object + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the group. This can be used as an alternative to the APIVersion, and then resolved using ResolveGroup. Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.' + type: string + uri: + description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. + type: string + status: + description: Status represents the current state of the EventTransform. This data may be out of date. + type: object + properties: + address: + description: Address is a single Addressable address. If Addresses is present, Address will be ignored by clients. + type: object + required: + - url + properties: + CACerts: + description: CACerts is the Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + type: string + audience: + description: Audience is the OIDC audience for this address. + type: string + name: + description: Name is the name of the address. + type: string + url: + type: string + addresses: + description: Addresses is a list of addresses for different protocols (HTTP and HTTPS) If Addresses is present, Address must be ignored by clients. + type: array + items: + type: object + required: + - url + properties: + CACerts: + description: CACerts is the Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + type: string + audience: + description: Audience is the OIDC audience for this address. + type: string + name: + description: Name is the name of the address. + type: string + url: + type: string + annotations: + description: Annotations is additional Status fields for the Resource to save some additional State as well as convey more information to the user. This is roughly akin to Annotations on any k8s resource, just the reconciler conveying richer information outwards. + type: object + x-kubernetes-preserve-unknown-fields: true + auth: + description: Auth defines the attributes that provide the generated service account name in the resource status. + type: object + required: + - serviceAccountName + properties: + serviceAccountName: + description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. + type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. This list can have len() > 1, when the component uses multiple identities (e.g. in case of a Parallel). + type: array + items: + type: string + conditions: + description: Conditions the latest available observations of a resource's current state. + type: array + items: + type: object + required: + - type + - status + properties: + lastTransitionTime: + description: LastTransitionTime is the last time the condition transitioned from one status to another. We use VolatileTime in place of metav1.Time to exclude this from creating equality.Semantic differences (all other things held constant). + type: string + message: + description: A human readable message indicating details about the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + severity: + description: Severity with which to treat failures of this type of condition. When this is not specified, it defaults to Error. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of condition. + type: string + jsonata: + description: JsonataTransformationStatus is the status associated with JsonataEventTransformationSpec. + type: object + properties: + deployment: + type: object + properties: + availableReplicas: + description: Total number of available pods (ready for at least minReadySeconds) targeted by this deployment. + type: integer + format: int32 + collisionCount: + description: Count of hash collisions for the Deployment. The Deployment controller uses this field as a collision avoidance mechanism when it needs to create the name for the newest ReplicaSet. + type: integer + format: int32 + conditions: + description: Represents the latest available observations of a deployment's current state. + type: array + items: + type: object + properties: + lastTransitionTime: + description: Last time the condition transitioned from one status to another. + type: string + lastUpdateTime: + description: The last time this condition was updated. + type: string + message: + description: A human readable message indicating details about the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of deployment condition. + type: string + observedGeneration: + description: The generation observed by the deployment controller. + type: integer + format: int64 + readyReplicas: + description: readyReplicas is the number of pods targeted by this Deployment with a Ready Condition. + type: integer + format: int32 + replicas: + description: Total number of non-terminated pods targeted by this deployment (their labels match the selector). + type: integer + format: int32 + unavailableReplicas: + description: Total number of unavailable pods targeted by this deployment. This is the total number of pods that are still required for the deployment to have 100% available capacity. They may either be pods that are running but not yet available or pods that still have not been created. + type: integer + format: int32 + updatedReplicas: + description: Total number of non-terminated pods targeted by this deployment that have the desired template spec. + type: integer + format: int32 + observedGeneration: + description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller. + type: integer + format: int64 + sinkAudience: + description: SinkAudience is the OIDC audience of the sink. + type: string + sinkCACerts: + description: SinkCACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + type: string + sinkUri: + description: SinkURI is the current active sink URI that has been configured for the Source. + type: string + + additionalPrinterColumns: + - name: URL + type: string + jsonPath: ".status.address.url" + - name: Sink + type: string + jsonPath: ".status.sinkUri" + - name: Ready + type: string + jsonPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + jsonPath: ".status.conditions[?(@.type==\"Ready\")].reason" + names: + kind: EventTransform + plural: eventtransforms + singular: eventtransform + categories: + - all + - knative + - eventing + scope: Namespaced From e65361a52cc9afbe92e683ec7c17ea6dfa5ef3e7 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 12 Feb 2025 10:39:41 +0100 Subject: [PATCH 3/7] Add reconciler to controller binary Signed-off-by: Pierangelo Di Pilato --- cmd/controller/main.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index cbbc8bc5c8d..94aa8fd7b27 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -20,6 +20,8 @@ import ( // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "fmt" + "knative.dev/pkg/injection/sharedmain" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" @@ -29,6 +31,7 @@ import ( "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/reconciler/eventpolicy" + "knative.dev/eventing/pkg/reconciler/eventtransform" "knative.dev/eventing/pkg/reconciler/jobsink" "knative.dev/eventing/pkg/reconciler/apiserversource" @@ -53,6 +56,7 @@ func main() { auth.OIDCLabelSelector, eventingtls.TrustBundleLabelSelector, sinks.JobSinkJobsLabelSelector, + fmt.Sprintf("%s=%s", eventtransform.JsonataResourcesLabelKey, eventtransform.JsonataResourcesLabelValue), ) sharedmain.MainWithContext(ctx, "controller", @@ -84,5 +88,8 @@ func main() { // Sugar sugarnamespace.NewController, sugartrigger.NewController, + + // Transform + eventtransform.NewController, ) } From 59e82eefb43976c4540eac16a8052172cff5ae29 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 12 Feb 2025 12:23:34 +0100 Subject: [PATCH 4/7] Add permissions Signed-off-by: Pierangelo Di Pilato --- .../addressable-resolvers-clusterrole.yaml | 22 +++++++++++++++++++ .../core/roles/controller-clusterroles.yaml | 2 ++ 2 files changed, 24 insertions(+) diff --git a/config/core/roles/addressable-resolvers-clusterrole.yaml b/config/core/roles/addressable-resolvers-clusterrole.yaml index af8f574c443..3ca1f41d2f4 100644 --- a/config/core/roles/addressable-resolvers-clusterrole.yaml +++ b/config/core/roles/addressable-resolvers-clusterrole.yaml @@ -188,3 +188,25 @@ rules: - get - list - watch + +--- + +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: eventtransforms-addressable-resolver + labels: + duck.knative.dev/addressable: "true" + app.kubernetes.io/version: devel + app.kubernetes.io/name: knative-eventing +# Do not use this role directly. These rules will be added to the "addressable-resolver" role. +rules: + - apiGroups: + - eventing.knative.dev + resources: + - eventtransforms + - eventtransforms/status + verbs: + - get + - list + - watch diff --git a/config/core/roles/controller-clusterroles.yaml b/config/core/roles/controller-clusterroles.yaml index eef3298792b..9b6ebc335b2 100644 --- a/config/core/roles/controller-clusterroles.yaml +++ b/config/core/roles/controller-clusterroles.yaml @@ -99,6 +99,8 @@ rules: - "eventtypes/status" - "eventpolicies" - "eventpolicies/status" + - "eventtransforms" + - "eventtransforms/status" verbs: - "get" - "list" From 41d49310700c6922da5b73f4bdd9c65729fd5b83 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 12 Feb 2025 14:13:12 +0100 Subject: [PATCH 5/7] Add selector to eventing filtered factory Signed-off-by: Pierangelo Di Pilato --- cmd/controller/main.go | 10 +++++++--- pkg/reconciler/eventtransform/controller.go | 7 +++---- pkg/reconciler/eventtransform/resources_jsonata.go | 2 ++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 94aa8fd7b27..a7e38bc087f 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -20,13 +20,13 @@ import ( // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "fmt" - "knative.dev/pkg/injection/sharedmain" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" "knative.dev/pkg/signals" + eventingfilteredfactory "knative.dev/eventing/pkg/client/injection/informers/factory/filtered" + "knative.dev/eventing/pkg/apis/sinks" "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/eventingtls" @@ -56,7 +56,11 @@ func main() { auth.OIDCLabelSelector, eventingtls.TrustBundleLabelSelector, sinks.JobSinkJobsLabelSelector, - fmt.Sprintf("%s=%s", eventtransform.JsonataResourcesLabelKey, eventtransform.JsonataResourcesLabelValue), + eventtransform.JsonataResourcesSelector, + ) + + ctx = eventingfilteredfactory.WithSelectors(ctx, + eventtransform.JsonataResourcesSelector, ) sharedmain.MainWithContext(ctx, "controller", diff --git a/pkg/reconciler/eventtransform/controller.go b/pkg/reconciler/eventtransform/controller.go index 08da1a2db49..b2cb16612f3 100644 --- a/pkg/reconciler/eventtransform/controller.go +++ b/pkg/reconciler/eventtransform/controller.go @@ -18,7 +18,6 @@ package eventtransform import ( "context" - "fmt" "k8s.io/apimachinery/pkg/types" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -46,9 +45,9 @@ func NewController( ) *controller.Impl { eventTransformInformer := eventtransformeryinformer.Get(ctx) - jsonataConfigMapInformer := configmapinformer.Get(ctx, fmt.Sprintf("%s=%s", JsonataResourcesLabelKey, JsonataResourcesLabelValue)) - jsonataDeploymentInformer := deploymentinformer.Get(ctx, fmt.Sprintf("%s=%s", JsonataResourcesLabelKey, JsonataResourcesLabelValue)) - jsonataSinkBindingInformer := sinkbindinginformer.Get(ctx, fmt.Sprintf("%s=%s", JsonataResourcesLabelKey, JsonataResourcesLabelValue)) + jsonataConfigMapInformer := configmapinformer.Get(ctx, JsonataResourcesSelector) + jsonataDeploymentInformer := deploymentinformer.Get(ctx, JsonataResourcesSelector) + jsonataSinkBindingInformer := sinkbindinginformer.Get(ctx, JsonataResourcesSelector) var globalResync func() diff --git a/pkg/reconciler/eventtransform/resources_jsonata.go b/pkg/reconciler/eventtransform/resources_jsonata.go index 2277124ab28..c3bdc94844e 100644 --- a/pkg/reconciler/eventtransform/resources_jsonata.go +++ b/pkg/reconciler/eventtransform/resources_jsonata.go @@ -27,6 +27,8 @@ const ( JsonataResourcesNameSuffix = "jsonata" JsonataExpressionDataKey = "jsonata-expression" JsonataExpressionPath = "/etc/jsonata" + + JsonataResourcesSelector = JsonataResourcesLabelKey + "=" + JsonataResourcesLabelValue ) func jsonataExpressionConfigMap(_ context.Context, transform *eventing.EventTransform) corev1.ConfigMap { From 3862b7a9266b8240d47448a66be33319906802fe Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 12 Feb 2025 14:13:52 +0100 Subject: [PATCH 6/7] Add License header Signed-off-by: Pierangelo Di Pilato --- .../eventtransform/resources_jsonata.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/reconciler/eventtransform/resources_jsonata.go b/pkg/reconciler/eventtransform/resources_jsonata.go index c3bdc94844e..6dc7e0ef980 100644 --- a/pkg/reconciler/eventtransform/resources_jsonata.go +++ b/pkg/reconciler/eventtransform/resources_jsonata.go @@ -1,3 +1,19 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package eventtransform import ( From 27fe8daee7481e950a1bbdbad96e9d3444026863 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 12 Feb 2025 14:21:06 +0100 Subject: [PATCH 7/7] Add symlink Signed-off-by: Pierangelo Di Pilato --- config/300-eventtransform.yaml | 1 + 1 file changed, 1 insertion(+) create mode 120000 config/300-eventtransform.yaml diff --git a/config/300-eventtransform.yaml b/config/300-eventtransform.yaml new file mode 120000 index 00000000000..3564527fef6 --- /dev/null +++ b/config/300-eventtransform.yaml @@ -0,0 +1 @@ +./core/resources/eventtransform.yaml \ No newline at end of file