Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ApiServerSource: add optional selection criteria to ceOverride.extensions based on fields in the cloudevent #8001

Open
skoved opened this issue Jun 13, 2024 · 9 comments
Assignees
Labels
kind/feature-request triage/accepted Issues which should be fixed (post-triage)

Comments

@skoved
Copy link

skoved commented Jun 13, 2024

Problem
Our use case involves performing different actions based on the state of different resources on our k8s cluster.

Example: send a notification for Jobs that have failed (ie: where status.failed > 0).

We're using ApiServerSource to receive cloudevents about different types of resources on the cluster. We would like the ability to subscribe to cloudevents for resources that are in a specific state. I think this could be accomplished by adding a field to ApiServerSource that contains criteria for when a specific spec.ceOverride.extensions should be applied. This would allow us, when combined with spec.mode: Resource, to use either new trigger filters or maybe even filters on ApiServerSource itself to have our sinks only receive cloudevents where an action is required.

Persona:
Which persona is this feature for?

  • Event Consumer
  • Event Producer

Exit Criteria
events meeting the specified criteria have the approriate custom attribute applied

@muskan2622
Copy link

/assign

@pierDipi
Copy link
Member

pierDipi commented Jun 17, 2024

This feature might be similar to or combined with #7704

if we allow you defining lightweight transformation on a JSON represented event using JSON path, I think, it would solve both:

ceOverrides:
  jsonTransform:
    - from: .data.status.failed
      to: jobfailedstatus
    - from: .data.status.conditions[...].type
      to: completed

@skoved
Copy link
Author

skoved commented Jun 17, 2024

Thanks a bunch for accepting this request. I just wanna check if my understanding is correct. These cloudevent extensions added by jsonTransform would be able to be used with the ApiServerSource filter (#7791), right?

@pierDipi
Copy link
Member

Yes, the idea is that when we get an event we would apply the ceOverrides (including jsonTransform) and then pass through the defined filters

@pierDipi
Copy link
Member

pierDipi commented Jun 17, 2024

@muskan2622 since you have expressed interest in contributing, feel free to ask any questions, there are a few parts that needs to be changed as it's a relatively large feature but I'm happy help/chat/etc.

Here is a very high level idea for getting the field out of the event and then setting it as extension:

import (
	"encoding/json"
	"fmt"
	"regexp"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	cetest "github.com/cloudevents/sdk-go/v2/test"
	"github.com/cloudevents/sdk-go/v2/types"
	"k8s.io/client-go/util/jsonpath"
)


type JSONTransform struct {
	From string // JSON path
	To   string // CE extension
	Type string // CE Extension type
}

func ExampleJsonTransform() {

	exampleTransform := JSONTransform{
		From: ".data.status.failed",
		To:   "failed",
		Type: "integer", // One of https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#type-system
	}
	c := cetest.FullEvent()
	err := c.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
		"status": map[string]interface{}{
			"failed": 1,
		},
	})
	if err != nil {
		panic(err)
	}

	// Core implementation
	// 1. Parse From
	// 2. Find From in JSON-serialized event
	// 3. Set extension based on Type

	expr, err := relaxedJSONPathExpression(exampleTransform.From)
	if err != nil {
		panic(err)
	}

	jp := jsonpath.New("Parser")
	if err := jp.Parse(expr); err != nil {
		panic(err)
	}

	b, err := c.MarshalJSON()
	if err != nil {
		panic(err)
	}
	var data map[string]interface{}
	err = json.Unmarshal(b, &data)
	if err != nil {
		panic(err)
	}

	results, err := jp.FindResults(data)
	if err != nil {
		panic(err)
	}

	if len(results) != 1 && len(results[0]) != 1 {
		panic("expected 1 result")
	}

	// TODO: properly handle results and JSONTransform.Type with "results[0][0].Elem().CanInt()", "results[0][0].Elem().CanFloat()" etc

	c.SetExtension(exampleTransform.To, int64(results[0][0].Elem().Float()))

	b, err = c.MarshalJSON()
	if err != nil {
		panic(err)
	}

	f, err := types.ToInteger(c.Extensions()[exampleTransform.To])
	if err != nil {
		panic(err)
	}
	fmt.Println(f)
	// Output: 1
}

var jsonRegexp = regexp.MustCompile(`^\{\.?([^{}]+)}$|^\.?([^{}]+)$`)

// relaxedJSONPathExpression attempts to be flexible with JSONPath expressions, it accepts:
//   - metadata.name (no leading '.' or curly braces '{...}'
//   - {metadata.name} (no leading '.')
//   - .metadata.name (no curly braces '{...}')
//   - {.metadata.name} (complete expression)
//
// And transforms them all into a valid jsonpath expression:
//
//	{.metadata.name}
//
// Copied from https://github.com/kubernetes/kubectl/blob/a70106d6a8b4fc24633f7020b9fdc416648e7f22/pkg/cmd/get/customcolumn.go#L38-L67
// Copyright 2014 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//	http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
func relaxedJSONPathExpression(pathExpression string) (string, error) {
	if len(pathExpression) == 0 {
		return pathExpression, nil
	}
	submatches := jsonRegexp.FindStringSubmatch(pathExpression)
	if submatches == nil {
		return "", fmt.Errorf("unexpected path string, expected a 'name1.name2' or '.name1.name2' or '{name1.name2}' or '{.name1.name2}'")
	}
	if len(submatches) != 3 {
		return "", fmt.Errorf("unexpected submatch list: %v", submatches)
	}
	var fieldSpec string
	if len(submatches[1]) != 0 {
		fieldSpec = submatches[1]
	} else {
		fieldSpec = submatches[2]
	}
	return fmt.Sprintf("{.%s}", fieldSpec), nil
}

@muskan2622
Copy link

yes sure @pierDipi .

@pierDipi
Copy link
Member

pierDipi commented Aug 8, 2024

@muskan2622 do you have any updates on this issue?

@pierDipi
Copy link
Member

With the last EventTransform API PR #8458, you should be able to extract arbitrary fields as CloudEvent attributes from the JSON data or even just static attributes using jsonata expressions:

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: identity
spec:
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "transformation.jsonata",
        "source": "transformation.json.identity",
        "customattribute": data.customattributeFromData,
        "data": $
      }

Without a spec.sink the transformation is sent back in the response and you can use the built-in Broker response feature to re-publish the transformed event back to the Broker to be further consumed by other subscribers (be careful to not create loops)

Or otherwise, you can send the transformed event to a different service/endpoint like any "source":

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: identity-sink
spec:
  sink:
    uri: https://webhook.site/61b48d04-5629-471e-8d60-8b7038481f82
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "transformation.jsonata",
        "source": "transformation.json.identity",
        "customattribute": data.customattributeFromData,
        "data": $
      }

EventTransform is addressable, which means, you will be able to transform events coming from any source/trigger/subscription just as any other addressable resource:

apiVersion: sources.knative.dev/v1
kind: ApiServerSource
metadata:
  name: k8s-events
spec:
  serviceAccountName: event-watcher
  mode: Resource
  resources:
    - apiVersion: v1
      kind: Event
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: EventTransform
      name: identity-sink

@pierDipi
Copy link
Member

pierDipi commented Feb 13, 2025

We went with this approach as opposed to built-in Source extraction as it resolves multiple issues at once while enabling my more use cases for external and legacy integrations

@pierDipi pierDipi moved this from Icebox / Wishlist to In Progress in Eventing WG Roadmap Feb 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature-request triage/accepted Issues which should be fixed (post-triage)
Projects
Status: In Progress
Development

No branches or pull requests

3 participants