diff --git a/cmd/controller/main.go b/cmd/controller/main.go index cbbc8bc5c8d..a7e38bc087f 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -25,10 +25,13 @@ import ( filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" "knative.dev/pkg/signals" + eventingfilteredfactory "knative.dev/eventing/pkg/client/injection/informers/factory/filtered" + "knative.dev/eventing/pkg/apis/sinks" "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/reconciler/eventpolicy" + "knative.dev/eventing/pkg/reconciler/eventtransform" "knative.dev/eventing/pkg/reconciler/jobsink" "knative.dev/eventing/pkg/reconciler/apiserversource" @@ -53,6 +56,11 @@ func main() { auth.OIDCLabelSelector, eventingtls.TrustBundleLabelSelector, sinks.JobSinkJobsLabelSelector, + eventtransform.JsonataResourcesSelector, + ) + + ctx = eventingfilteredfactory.WithSelectors(ctx, + eventtransform.JsonataResourcesSelector, ) sharedmain.MainWithContext(ctx, "controller", @@ -84,5 +92,8 @@ func main() { // Sugar sugarnamespace.NewController, sugartrigger.NewController, + + // Transform + eventtransform.NewController, ) } diff --git a/cmd/schema/main.go b/cmd/schema/main.go index fed2f8f8d92..6bb7c794a1d 100644 --- a/cmd/schema/main.go +++ b/cmd/schema/main.go @@ -49,6 +49,7 @@ func main() { registry.Register(&flowsv1.Sequence{}) registry.Register(&flowsv1.Parallel{}) registry.Register(&eventingv1alpha1.EventPolicy{}) + registry.Register(&eventingv1alpha1.EventTransform{}) if err := commands.New("knative.dev/eventing").Execute(); err != nil { log.Fatal("Error during command execution: ", err) diff --git a/config/300-eventtransform.yaml b/config/300-eventtransform.yaml new file mode 120000 index 00000000000..3564527fef6 --- /dev/null +++ b/config/300-eventtransform.yaml @@ -0,0 +1 @@ +./core/resources/eventtransform.yaml \ No newline at end of file diff --git a/config/core/resources/eventtransform.yaml b/config/core/resources/eventtransform.yaml new file mode 100644 index 00000000000..19bce65c3d4 --- /dev/null +++ b/config/core/resources/eventtransform.yaml @@ -0,0 +1,259 @@ +# Copyright 2025 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: eventtransforms.eventing.knative.dev + labels: + knative.dev/crd-install: "true" + app.kubernetes.io/version: devel + app.kubernetes.io/name: knative-eventing +spec: + group: eventing.knative.dev + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: { } + schema: + openAPIV3Schema: + type: object + properties: + spec: + description: Spec defines the desired state of the EventTransform. + type: object + properties: + jsonata: + type: object + properties: + expression: + description: Expression is the JSONata expression. + type: string + sink: + description: 'Sink is a reference to an object that will resolve to a uri to use as the sink. If not present, the transformation will send back the transformed event as response, this is useful to leverage the built-in Broker reply feature to re-publish a transformed event back to the broker. ' + type: object + properties: + CACerts: + description: CACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. If set, these CAs are appended to the set of CAs provided by the Addressable target, if any. + type: string + audience: + description: Audience is the OIDC audience. This need only be set, if the target is not an Addressable and thus the Audience can't be received from the Addressable itself. In case the Addressable specifies an Audience too, the Destinations Audience takes preference. + type: string + ref: + description: Ref points to an Addressable. + type: object + properties: + address: + description: Address points to a specific Address Name. + type: string + apiVersion: + description: API version of the referent. + type: string + group: + description: 'Group of the API, without the version of the group. This can be used as an alternative to the APIVersion, and then resolved using ResolveGroup. Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086' + type: string + kind: + description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + name: + description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + namespace: + description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.' + type: string + uri: + description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. + type: string + status: + description: Status represents the current state of the EventTransform. This data may be out of date. + type: object + properties: + address: + description: Address is a single Addressable address. If Addresses is present, Address will be ignored by clients. + type: object + required: + - url + properties: + CACerts: + description: CACerts is the Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + type: string + audience: + description: Audience is the OIDC audience for this address. + type: string + name: + description: Name is the name of the address. + type: string + url: + type: string + addresses: + description: Addresses is a list of addresses for different protocols (HTTP and HTTPS) If Addresses is present, Address must be ignored by clients. + type: array + items: + type: object + required: + - url + properties: + CACerts: + description: CACerts is the Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + type: string + audience: + description: Audience is the OIDC audience for this address. + type: string + name: + description: Name is the name of the address. + type: string + url: + type: string + annotations: + description: Annotations is additional Status fields for the Resource to save some additional State as well as convey more information to the user. This is roughly akin to Annotations on any k8s resource, just the reconciler conveying richer information outwards. + type: object + x-kubernetes-preserve-unknown-fields: true + auth: + description: Auth defines the attributes that provide the generated service account name in the resource status. + type: object + required: + - serviceAccountName + properties: + serviceAccountName: + description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. + type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. This list can have len() > 1, when the component uses multiple identities (e.g. in case of a Parallel). + type: array + items: + type: string + conditions: + description: Conditions the latest available observations of a resource's current state. + type: array + items: + type: object + required: + - type + - status + properties: + lastTransitionTime: + description: LastTransitionTime is the last time the condition transitioned from one status to another. We use VolatileTime in place of metav1.Time to exclude this from creating equality.Semantic differences (all other things held constant). + type: string + message: + description: A human readable message indicating details about the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + severity: + description: Severity with which to treat failures of this type of condition. When this is not specified, it defaults to Error. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of condition. + type: string + jsonata: + description: JsonataTransformationStatus is the status associated with JsonataEventTransformationSpec. + type: object + properties: + deployment: + type: object + properties: + availableReplicas: + description: Total number of available pods (ready for at least minReadySeconds) targeted by this deployment. + type: integer + format: int32 + collisionCount: + description: Count of hash collisions for the Deployment. The Deployment controller uses this field as a collision avoidance mechanism when it needs to create the name for the newest ReplicaSet. + type: integer + format: int32 + conditions: + description: Represents the latest available observations of a deployment's current state. + type: array + items: + type: object + properties: + lastTransitionTime: + description: Last time the condition transitioned from one status to another. + type: string + lastUpdateTime: + description: The last time this condition was updated. + type: string + message: + description: A human readable message indicating details about the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of deployment condition. + type: string + observedGeneration: + description: The generation observed by the deployment controller. + type: integer + format: int64 + readyReplicas: + description: readyReplicas is the number of pods targeted by this Deployment with a Ready Condition. + type: integer + format: int32 + replicas: + description: Total number of non-terminated pods targeted by this deployment (their labels match the selector). + type: integer + format: int32 + unavailableReplicas: + description: Total number of unavailable pods targeted by this deployment. This is the total number of pods that are still required for the deployment to have 100% available capacity. They may either be pods that are running but not yet available or pods that still have not been created. + type: integer + format: int32 + updatedReplicas: + description: Total number of non-terminated pods targeted by this deployment that have the desired template spec. + type: integer + format: int32 + observedGeneration: + description: ObservedGeneration is the 'Generation' of the Service that was last processed by the controller. + type: integer + format: int64 + sinkAudience: + description: SinkAudience is the OIDC audience of the sink. + type: string + sinkCACerts: + description: SinkCACerts are Certification Authority (CA) certificates in PEM format according to https://www.rfc-editor.org/rfc/rfc7468. + type: string + sinkUri: + description: SinkURI is the current active sink URI that has been configured for the Source. + type: string + + additionalPrinterColumns: + - name: URL + type: string + jsonPath: ".status.address.url" + - name: Sink + type: string + jsonPath: ".status.sinkUri" + - name: Ready + type: string + jsonPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + jsonPath: ".status.conditions[?(@.type==\"Ready\")].reason" + names: + kind: EventTransform + plural: eventtransforms + singular: eventtransform + categories: + - all + - knative + - eventing + scope: Namespaced diff --git a/config/core/roles/addressable-resolvers-clusterrole.yaml b/config/core/roles/addressable-resolvers-clusterrole.yaml index af8f574c443..3ca1f41d2f4 100644 --- a/config/core/roles/addressable-resolvers-clusterrole.yaml +++ b/config/core/roles/addressable-resolvers-clusterrole.yaml @@ -188,3 +188,25 @@ rules: - get - list - watch + +--- + +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: eventtransforms-addressable-resolver + labels: + duck.knative.dev/addressable: "true" + app.kubernetes.io/version: devel + app.kubernetes.io/name: knative-eventing +# Do not use this role directly. These rules will be added to the "addressable-resolver" role. +rules: + - apiGroups: + - eventing.knative.dev + resources: + - eventtransforms + - eventtransforms/status + verbs: + - get + - list + - watch diff --git a/config/core/roles/controller-clusterroles.yaml b/config/core/roles/controller-clusterroles.yaml index eef3298792b..9b6ebc335b2 100644 --- a/config/core/roles/controller-clusterroles.yaml +++ b/config/core/roles/controller-clusterroles.yaml @@ -99,6 +99,8 @@ rules: - "eventtypes/status" - "eventpolicies" - "eventpolicies/status" + - "eventtransforms" + - "eventtransforms/status" verbs: - "get" - "list" diff --git a/pkg/reconciler/eventtransform/controller.go b/pkg/reconciler/eventtransform/controller.go new file mode 100644 index 00000000000..b2cb16612f3 --- /dev/null +++ b/pkg/reconciler/eventtransform/controller.go @@ -0,0 +1,100 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtransform + +import ( + "context" + + "k8s.io/apimachinery/pkg/types" + kubeclient "knative.dev/pkg/client/injection/kube/client" + deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered" + configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" + "knative.dev/pkg/configmap" + "knative.dev/pkg/controller" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/logging" + + "knative.dev/eventing/pkg/apis/feature" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + eventtransformeryinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtransform" + sinkbindinginformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1/sinkbinding/filtered" + "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1alpha1/eventtransform" +) + +const ( + NameLabelKey = "eventing.knative.dev/event-transform-name" +) + +func NewController( + ctx context.Context, + cmw configmap.Watcher, +) *controller.Impl { + + eventTransformInformer := eventtransformeryinformer.Get(ctx) + jsonataConfigMapInformer := configmapinformer.Get(ctx, JsonataResourcesSelector) + jsonataDeploymentInformer := deploymentinformer.Get(ctx, JsonataResourcesSelector) + jsonataSinkBindingInformer := sinkbindinginformer.Get(ctx, JsonataResourcesSelector) + + var globalResync func() + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync() + } + }) + featureStore.WatchConfigs(cmw) + + r := &Reconciler{ + k8s: kubeclient.Get(ctx), + client: eventingclient.Get(ctx), + jsonataConfigMapLister: jsonataConfigMapInformer.Lister(), + jsonataDeploymentsLister: jsonataDeploymentInformer.Lister(), + jsonataSinkBindingLister: jsonataSinkBindingInformer.Lister(), + } + + impl := eventtransform.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) + + globalResync = func() { + impl.GlobalResync(eventTransformInformer.Informer()) + } + + eventTransformInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + jsonataDeploymentInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + jsonataConfigMapInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + jsonataSinkBindingInformer.Informer().AddEventHandler(controller.HandleAll(enqueueUsingNameLabel(impl))) + + return impl +} + +func enqueueUsingNameLabel(impl *controller.Impl) func(obj interface{}) { + return func(obj interface{}) { + acc, err := kmeta.DeletionHandlingAccessor(obj) + if err != nil { + return + } + name, ok := acc.GetLabels()[NameLabelKey] + if !ok { + return + } + impl.EnqueueKey(types.NamespacedName{Namespace: acc.GetNamespace(), Name: name}) + } +} diff --git a/pkg/reconciler/eventtransform/eventtransform.go b/pkg/reconciler/eventtransform/eventtransform.go new file mode 100644 index 00000000000..2500cdd88c4 --- /dev/null +++ b/pkg/reconciler/eventtransform/eventtransform.go @@ -0,0 +1,224 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtransform + +import ( + "context" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + appslister "k8s.io/client-go/listers/apps/v1" + corelister "k8s.io/client-go/listers/core/v1" + "knative.dev/pkg/controller" + "knative.dev/pkg/reconciler" + + eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + sources "knative.dev/eventing/pkg/apis/sources/v1" + eventingclient "knative.dev/eventing/pkg/client/clientset/versioned" + sourceslisters "knative.dev/eventing/pkg/client/listers/sources/v1" +) + +type Reconciler struct { + k8s kubernetes.Interface + client eventingclient.Interface + + jsonataConfigMapLister corelister.ConfigMapLister + jsonataDeploymentsLister appslister.DeploymentLister + jsonataSinkBindingLister sourceslisters.SinkBindingLister +} + +func (r *Reconciler) ReconcileKind(ctx context.Context, transform *eventing.EventTransform) reconciler.Event { + if err := r.reconcileJsonataTransformation(ctx, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation: %w", err) + } + return nil +} + +func (r *Reconciler) reconcileJsonataTransformation(ctx context.Context, transform *eventing.EventTransform) error { + if transform.Spec.Transformations.Jsonata == nil { + return nil + } + + expressionCm, err := r.reconcileJsonataTransformationConfigMap(ctx, transform) + if err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err) + } + if err := r.reconcileJsonataTransformationDeployment(ctx, expressionCm, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation deployment: %w", err) + } + if err := r.reconcileJsonataTransformationSinkBinding(ctx, transform); err != nil { + return fmt.Errorf("failed to reconcile Jsonata transformation sink binding: %w", err) + } + + return nil +} + +func (r *Reconciler) reconcileJsonataTransformationConfigMap(ctx context.Context, transform *eventing.EventTransform) (*corev1.ConfigMap, error) { + expected := jsonataExpressionConfigMap(ctx, transform) + + curr, err := r.jsonataConfigMapLister.ConfigMaps(expected.GetNamespace()).Get(expected.GetName()) + if apierrors.IsNotFound(err) { + return r.createConfigMap(ctx, transform, expected) + } + if err != nil { + return nil, fmt.Errorf("failed to get configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + if equality.Semantic.DeepDerivative(expected, curr) { + return curr, nil + } + expected.ResourceVersion = curr.ResourceVersion + return r.updateConfigMap(ctx, transform, expected) +} + +func (r *Reconciler) reconcileJsonataTransformationDeployment(ctx context.Context, expression *corev1.ConfigMap, transform *eventing.EventTransform) error { + expected := jsonataDeployment(ctx, expression, transform) + + curr, err := r.jsonataDeploymentsLister.Deployments(expected.GetNamespace()).Get(expected.GetName()) + if apierrors.IsNotFound(err) { + created, err := r.createDeployment(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataDeploymentStatus(created.Status) + return nil + } + if err != nil { + return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + if equality.Semantic.DeepDerivative(expected, curr) { + transform.Status.PropagateJsonataDeploymentStatus(curr.Status) + return nil + } + expected.ResourceVersion = curr.ResourceVersion + updated, err := r.updateDeployment(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataDeploymentStatus(updated.Status) + return nil +} + +func (r *Reconciler) reconcileJsonataTransformationSinkBinding(ctx context.Context, transform *eventing.EventTransform) error { + if transform.Spec.Sink == nil { + transform.Status.PropagateJsonataSinkBindingUnset() + return r.deleteJsonataTransformationSinkBinding(ctx, transform) + } + + expected := jsonataSinkBinding(ctx, transform) + curr, err := r.jsonataSinkBindingLister.SinkBindings(expected.GetNamespace()).Get(expected.GetName()) + if apierrors.IsNotFound(err) { + created, err := r.createSinkBinding(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataSinkBindingStatus(created.Status) + return nil + } + if err != nil { + return fmt.Errorf("failed to get deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + if equality.Semantic.DeepDerivative(expected, curr) { + transform.Status.PropagateJsonataSinkBindingStatus(curr.Status) + return nil + } + expected.ResourceVersion = curr.ResourceVersion + updated, err := r.updateSinkBinding(ctx, transform, expected) + if err != nil { + return err + } + transform.Status.PropagateJsonataSinkBindingStatus(updated.Status) + return nil +} + +func (r *Reconciler) createDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) { + created, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentCreated", "", expected.GetName()) + return created, nil +} + +func (r *Reconciler) createConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) { + created, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataConfigMapCreated", "", expected.GetName()) + return created, nil +} + +func (r *Reconciler) updateDeployment(ctx context.Context, transform *eventing.EventTransform, expected appsv1.Deployment) (*appsv1.Deployment, error) { + updated, err := r.k8s.AppsV1().Deployments(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to update deployment %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataDeploymentUpdated", "", expected.GetName()) + return updated, nil +} + +func (r *Reconciler) updateConfigMap(ctx context.Context, transform *eventing.EventTransform, expected corev1.ConfigMap) (*corev1.ConfigMap, error) { + updated, err := r.k8s.CoreV1().ConfigMaps(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to update configmap %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "ConfigMapUpdated", "", expected.GetName()) + return updated, nil +} + +func (r *Reconciler) deleteJsonataTransformationSinkBinding(ctx context.Context, transform *eventing.EventTransform) error { + sbName := jsonataSinkBindingName(transform) + _, err := r.jsonataSinkBindingLister.SinkBindings(transform.GetNamespace()).Get(sbName) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("failed to get sink binding %s/%s: %w", transform.GetNamespace(), sbName, err) + } + + err = r.client.SourcesV1().SinkBindings(transform.GetNamespace()).Delete(ctx, sbName, metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("failed to delete sink binding %s/%s: %w", transform.GetNamespace(), sbName, err) + } + return nil +} + +func (r *Reconciler) createSinkBinding(ctx context.Context, transform *eventing.EventTransform, expected sources.SinkBinding) (*sources.SinkBinding, error) { + created, err := r.client.SourcesV1().SinkBindings(expected.GetNamespace()).Create(ctx, &expected, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create jsonata sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingCreated", "", expected.GetName()) + return created, nil +} + +func (r *Reconciler) updateSinkBinding(ctx context.Context, transform *eventing.EventTransform, expected sources.SinkBinding) (*sources.SinkBinding, error) { + updated, err := r.client.SourcesV1().SinkBindings(expected.GetNamespace()).Update(ctx, &expected, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to update sink binding %s/%s: %w", expected.GetNamespace(), expected.GetName(), err) + } + controller.GetEventRecorder(ctx).Event(transform, "JsonataSinkBindingUpdated", "", expected.GetName()) + return updated, nil +} diff --git a/pkg/reconciler/eventtransform/resources_jsonata.go b/pkg/reconciler/eventtransform/resources_jsonata.go new file mode 100644 index 00000000000..6dc7e0ef980 --- /dev/null +++ b/pkg/reconciler/eventtransform/resources_jsonata.go @@ -0,0 +1,194 @@ +/* +Copyright 2025 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventtransform + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "os" + "path/filepath" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/ptr" + "knative.dev/pkg/tracker" + + eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" +) + +const ( + JsonataResourcesLabelKey = "eventing.knative.dev/event-transform-jsonata" + JsonataResourcesLabelValue = "true" + JsonataExpressionHashKey = "eventing.knative.dev/event-transform-jsonata-expression-hash" + JsonataResourcesNameSuffix = "jsonata" + JsonataExpressionDataKey = "jsonata-expression" + JsonataExpressionPath = "/etc/jsonata" + + JsonataResourcesSelector = JsonataResourcesLabelKey + "=" + JsonataResourcesLabelValue +) + +func jsonataExpressionConfigMap(_ context.Context, transform *eventing.EventTransform) corev1.ConfigMap { + expression := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), + Annotations: transform.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(transform), + }, + }, + Data: map[string]string{ + JsonataExpressionDataKey: transform.Spec.Transformations.Jsonata.Expression, + }, + } + return expression +} + +func jsonataDeployment(_ context.Context, expression *corev1.ConfigMap, transform *eventing.EventTransform) appsv1.Deployment { + image := os.Getenv("EVENT_TRANSFORM_JSONATA_IMAGE") + if image == "" { + panic("EVENT_TRANSFORM_JSONATA_IMAGE must be set") + } + + d := appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix), + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), + Annotations: transform.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(transform), + }, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + JsonataResourcesLabelKey: JsonataResourcesLabelValue, + NameLabelKey: transform.GetName(), + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + JsonataResourcesLabelKey: JsonataResourcesLabelValue, + NameLabelKey: transform.GetName(), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "jsonata-event-transform", + Image: image, + Env: []corev1.EnvVar{ + { + Name: "JSONATA_TRANSFORM_FILE_NAME", + Value: filepath.Join(JsonataExpressionPath, JsonataExpressionDataKey), + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: expression.GetName(), + ReadOnly: true, + MountPath: JsonataExpressionPath, + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: expression.GetName(), + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: expression.GetName(), + }, + Optional: ptr.Bool(false), + }, + }, + }, + }, + }, + }, + Strategy: appsv1.DeploymentStrategy{ + Type: appsv1.RollingUpdateDeploymentStrategyType, + RollingUpdate: &appsv1.RollingUpdateDeployment{ + MaxUnavailable: &intstr.IntOrString{ + Type: intstr.Int, + IntVal: 0, + }, + }, + }, + }, + } + + // When the expression changes, this rolls out a new deployment with the latest ConfigMap data. + hash := sha256.Sum256([]byte(transform.Spec.Transformations.Jsonata.Expression)) + d.Annotations[JsonataExpressionHashKey] = base64.StdEncoding.EncodeToString(hash[:]) + + return d +} + +func jsonataSinkBindingName(transform *eventing.EventTransform) string { + return kmeta.ChildName(transform.Name, JsonataResourcesNameSuffix) +} + +func jsonataSinkBinding(_ context.Context, transform *eventing.EventTransform) sourcesv1.SinkBinding { + name := jsonataSinkBindingName(transform) + sb := sourcesv1.SinkBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: transform.GetNamespace(), + Labels: jsonataLabels(transform), + Annotations: transform.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(transform), + }, + }, + Spec: sourcesv1.SinkBindingSpec{ + SourceSpec: duckv1.SourceSpec{ + Sink: *transform.Spec.Sink.DeepCopy(), + }, + BindingSpec: duckv1.BindingSpec{ + Subject: tracker.Reference{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", + Namespace: transform.GetNamespace(), + Name: name, + }, + }, + }, + } + + return sb +} + +func jsonataLabels(transform *eventing.EventTransform) map[string]string { + labels := make(map[string]string, len(transform.Labels)+2) + for k, v := range transform.Labels { + labels[k] = v + } + labels[JsonataResourcesLabelKey] = JsonataResourcesLabelValue + labels[NameLabelKey] = transform.GetName() + return labels +} diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered/deployment.go b/vendor/knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered/deployment.go new file mode 100644 index 00000000000..89de6fb7e46 --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered/deployment.go @@ -0,0 +1,65 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package filtered + +import ( + context "context" + + v1 "k8s.io/client-go/informers/apps/v1" + filtered "knative.dev/pkg/client/injection/kube/informers/factory/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterFilteredInformers(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct { + Selector string +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(filtered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := filtered.Get(ctx, selector) + inf := f.Apps().V1().Deployments() + ctx = context.WithValue(ctx, Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context, selector string) v1.DeploymentInformer { + untyped := ctx.Value(Key{Selector: selector}) + if untyped == nil { + logging.FromContext(ctx).Panicf( + "Unable to fetch k8s.io/client-go/informers/apps/v1.DeploymentInformer with selector %s from context.", selector) + } + return untyped.(v1.DeploymentInformer) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9c9716a8a33..b78b98c46e1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1158,6 +1158,7 @@ knative.dev/pkg/client/injection/kube/informers/admissionregistration/v1/mutatin knative.dev/pkg/client/injection/kube/informers/admissionregistration/v1/validatingwebhookconfiguration knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake +knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/filtered knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake knative.dev/pkg/client/injection/kube/informers/batch/v1/job/filtered