Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Could I use trigger's filter, to filter from "data" field? #8274

Open
usrdmn opened this issue Oct 24, 2024 · 6 comments
Open

Question: Could I use trigger's filter, to filter from "data" field? #8274

usrdmn opened this issue Oct 24, 2024 · 6 comments
Labels
triage/accepted Issues which should be fixed (post-triage)

Comments

@usrdmn
Copy link

usrdmn commented Oct 24, 2024

Hello!

I have a Kafka broker aggregating all my messages from different Kafka instances and topics using kafkasources. My main goal is to deliver these messages to appropriate functions (sequences, parallels, etc.) using triggers.

Initially, I thought it was only possible by filtering on cloud-native attributes, but then I found this document that seems to enable message filtering based on data fields.

Here's an example from the document:

spec:
  filter:
    expression: >
      user.id == "abc123"

This is similar to a YAML file I created for testing, which is supposed to trigger the test-function function:

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: test-trigger-with-filter
  namespace: knative
spec:
  broker: kafka-broker-external
  filter:
    expression: >
      user.id == "abc123"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: test-function

However, when I try to apply this YAML file to my cluster, I get an error:

Error from server (BadRequest): error when creating "trigger-with-filter.yaml": Trigger in version "v1" cannot be handled as a Trigger: strict decoding error: unknown field "spec.filter.expression"

I have an almost default installation of Serving and Eventing.

To summarize, my question is: Do Knative Eventing triggers support filtering messages based on specific values in the data field?

@pierDipi
Copy link
Member

Hi @usrdmn, filtering on the data field is not possible currently but we would be open to a proposal for it, it seems this would be related to #8001 or #7704

@pierDipi pierDipi added the triage/accepted Issues which should be fixed (post-triage) label Oct 24, 2024
@Cali0707
Copy link
Member

@pierDipi at least for the CESQL side of stuff, there was a lot of opposition to introducing the ability to filter on data when I brought it up in the serverless wg call a while ago. Mostly re: not tying to CESQL spec to a specific data format

@pierDipi
Copy link
Member

@Cali0707 we could explore to see if we can extend it with custom functions?

@Cali0707
Copy link
Member

@Cali0707 we could explore to see if we can extend it with custom functions?

Sure, do you want to discuss in the WG call tomorrow @pierDipi ?

@pierDipi
Copy link
Member

pierDipi commented Feb 13, 2025

With the last EventTransform API PR #8458, you should be able to extract arbitrary fields as CloudEvent attributes from the JSON data or even just static attributes using jsonata expressions:

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: identity
spec:
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "transformation.jsonata",
        "source": "transformation.json.identity",
        "customattribute": data.customattributeFromData,
        "data": $
      }

Without a spec.sink the transformation is sent back in the response and you can use the built-in Broker response feature to re-publish the transformed event back to the Broker to be further consumed by other subscribers (be careful to not create loops)

Or otherwise, you can send the transformed event to a different service/endpoint like any "source":

apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: identity-sink
spec:
  sink:
    uri: https://webhook.site/61b48d04-5629-471e-8d60-8b7038481f82
  jsonata:
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "transformation.jsonata",
        "source": "transformation.json.identity",
        "customattribute": data.customattributeFromData,
        "data": $
      }

EventTransform is addressable, which means, you will be able to transform events coming from any source/trigger/subscription just as any other addressable resource:

apiVersion: sources.knative.dev/v1
kind: ApiServerSource
metadata:
  name: k8s-events
spec:
  serviceAccountName: event-watcher
  mode: Resource
  resources:
    - apiVersion: v1
      kind: Event
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: EventTransform
      name: identity-sink

For this particular use case, you can:

apiVersion: sources.knative.dev/v1
kind: ...
metadata:
  name: my-source
spec:
  # ... fields omitted from brevity
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: EventTransform
      name: user-id-extract-with-sink
---
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: user-id-extract-with-sink
spec:
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: kafka-broker-external
  jsonata: 
    # Ensure the expression produces a valid CloudEvent since the sink (Broker) only accepts CloudEvents
    # example payload `{"data": { "user": { "id": "abc123" } } }`
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "transformation.jsonata",
        "source": "transformation.json.identity",
        "userid": data.user.id, 
        "data": $
      }
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: kafka-broker-external
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: test-trigger-with-filter
spec:
  broker: kafka-broker-external
  filter:
    userid: "abc123"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: test-function

or alternatively, you can:

apiVersion: sources.knative.dev/v1
kind: ...
metadata:
  name: my-source
spec:
  # ... fields omitted from brevity
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: kafka-broker-external
---
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: kafka-broker-external
---
apiVersion: eventing.knative.dev/v1alpha1
kind: EventTransform
metadata:
  name: user-id-extract
spec:
  jsonata: 
    # Ensure the expression produces a valid CloudEvent since the sink (Broker) only accepts CloudEvents
    # example payload `{"data": { "user": { "id": "abc123" } } }`
    expression: |
      {
        "specversion": "1.0",
        "id": id,
        "type": "useridextract",
        "source": "transformation.json.identity",
        "userid": data.user.id, 
        "data": $
      }
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: transform-trigger
spec:
  broker: kafka-broker-external
  filters: # filter out already transformed events to avoid loops
    - not:
        exact:
          type: useridextract
  subscriber:
    ref:
      apiVersion: eventing.knative.dev/v1alpha1
      kind: EventTransform
      name: user-id-extract
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: test-trigger-with-filter
spec:
  broker: kafka-broker-external
  filter:
    userid: "abc123"
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: test-function

@pierDipi
Copy link
Member

pierDipi commented Feb 13, 2025

We went with this approach as opposed to built-in filtering as it resolves multiple issues at once while enabling more use cases for external and legacy integrations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage/accepted Issues which should be fixed (post-triage)
Projects
Status: In Progress
Development

No branches or pull requests

3 participants