Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RequestReply DataPlane changes #8446

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions pkg/apis/eventing/v1alpha1/requestreply_dataplane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package main

Check failure on line 1 in pkg/apis/eventing/v1alpha1/requestreply_dataplane.go

View workflow job for this annotation

GitHub Actions / style / Golang / Auto-format and Check

Please run goimports. diff --git a/pkg/apis/eventing/v1alpha1/requestreply_dataplane.go b/pkg/apis/eventing/v1alpha1/requestreply_dataplane.go index 667f844..75452e8 100644 --- a/pkg/apis/eventing/v1alpha1/requestreply_dataplane.go +++ b/pkg/apis/eventing/v1alpha1/requestreply_dataplane.go @@ -49,10 +49,10 @@ func (m *proxiedRequestMap) deleteEvent(event *cloudevents.Event) { func (m *proxiedRequestMap) HandleNewEvent(ctx context.Context, responseWriter http.ResponseWriter, event *cloudevents.Event) { fmt.Printf("handling new event: %s\n", event.String()) - originalID := event.ID() //generate signed id using event id - signedID := originalID + "_signed" //placeholder for signing - correlationID := fmt.Sprintf("%s:%s", originalID, signedID) //setting correlation id - event.SetExtension("correlationId", correlationID) + originalID := event.ID() //generate signed id using event id + signedID := originalID + "_signed" //placeholder for signing + correlationID := fmt.Sprintf("%s:%s", originalID, signedID) //setting correlation id + event.SetExtension("correlationId", correlationID) pr := m.addEvent(responseWriter, event) @@ -140,4 +140,4 @@ func main() { http.HandleFunc("/", proxyMap.HandleRequest) http.ListenAndServe(":8080", nil) -} \ No newline at end of file +}

Check failure on line 1 in pkg/apis/eventing/v1alpha1/requestreply_dataplane.go

View workflow job for this annotation

GitHub Actions / style / Golang / Boilerplate Check (go)

[Go headers] reported by reviewdog 🐶 missing boilerplate: Raw Output: pkg/apis/eventing/v1alpha1/requestreply_dataplane.go:1: missing boilerplate: /* Copyright 2025 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */

Check failure on line 1 in pkg/apis/eventing/v1alpha1/requestreply_dataplane.go

View workflow job for this annotation

GitHub Actions / analyze / Go vulnerability Detection

package main; expected package v1alpha1

Check failure on line 1 in pkg/apis/eventing/v1alpha1/requestreply_dataplane.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

package main; expected package v1alpha1 (typecheck)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file should be moved to cmd/requestreply/dataplane/main.go


import (
"context"
"fmt"
"net/http"
"os"
"sync"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

type proxiedRequest struct {
received time.Time
responseWriter http.ResponseWriter
replyEvent chan *cloudevents.Event
}

type proxiedRequestMap struct {
lock sync.RWMutex
entries map[string]*proxiedRequest
}

func (m *proxiedRequestMap) addEvent(responseWriter http.ResponseWriter, event *cloudevents.Event) *proxiedRequest {
m.lock.Lock()
defer m.lock.Unlock()

id := event.ID()
pr := &proxiedRequest{
received: time.Now(),
responseWriter: responseWriter,
replyEvent: make(chan *cloudevents.Event, 1),
}
m.entries[id] = pr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of id, we should be using the id + signed id attribute from the relevant cloudevent extension attribute (accessed using the env var).


return pr
}

func (m *proxiedRequestMap) deleteEvent(event *cloudevents.Event) {
m.lock.Lock()
defer m.lock.Unlock()
delete(m.entries, event.ID())
}

func (m *proxiedRequestMap) HandleNewEvent(ctx context.Context, responseWriter http.ResponseWriter, event *cloudevents.Event) {
fmt.Printf("handling new event: %s\n", event.String())

originalID := event.ID() //generate signed id using event id
signedID := originalID + "_signed" //placeholder for signing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should do the actual signing. We likely want to make a helper function like https://github.com/d3akhtar/eventing/blob/fefc79aa7d37180e6f67b6dde8ac0982e9f3ce5e/pkg/eventfilter/subscriptionsapi/cesql_correlationid_filter.go#L214-L227, except instead of decrypting the signed data, we want to encrypt it.

correlationID := fmt.Sprintf("%s:%s", originalID, signedID) //setting correlation id
event.SetExtension("correlationId", correlationID)

pr := m.addEvent(responseWriter, event)

c, err := cloudevents.NewClientHTTP(cehttp.WithTarget(os.Getenv("K_SINK")))
if err != nil {
fmt.Printf("failed to start a client: %s\n", err)
responseWriter.WriteHeader(http.StatusInternalServerError)
return
}

res := c.Send(ctx, *event)
if protocol.IsNACK(res) || protocol.IsUndelivered(res) {
fmt.Printf("failed to send event %s: %s\n", event.ID(), res.Error())
responseWriter.WriteHeader(http.StatusInternalServerError)
}

for {
select {
case resp := <-pr.replyEvent:
fmt.Printf("received a response event to event %s, sending back as response\n", event.ID())
msg := binding.ToMessage(resp)
err := cehttp.WriteResponseWriter(ctx, msg, http.StatusOK, pr.responseWriter)
if err != nil {
fmt.Printf("failed to send event back: %s\n", err.Error())
}
msg.Finish(err)
m.deleteEvent(event)
return
case <-ctx.Done():
fmt.Printf("context timeout reached before encountering a response to the event, discarding event %s\n", event.ID())
responseWriter.WriteHeader(http.StatusRequestTimeout) // handle timeout to send http408
m.deleteEvent(event)
return
}
}
}
func (m *proxiedRequestMap) HandleResponseEvent(ctx context.Context, responseWriter http.ResponseWriter, event *cloudevents.Event) {
m.lock.RLock()
defer m.lock.RUnlock()

fmt.Printf("handling a response event!\n")

responseWriter.WriteHeader(http.StatusAccepted)

id := event.Extensions()["replyAttribute"] //check if event has replyAttribute
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the cloud event extension which we access should be an environment variable, for example CE_REPLY_ATTRIBUTE_NAME

pr, ok := m.entries[id.(string)]
if !ok {
fmt.Printf("no event found corresponding to the response id %s, discarding\n", id)
return
}

// send the reply event back to the original response writer
fmt.Printf("found an event corresponding to the response id %s, replying\n", id)
pr.replyEvent <- event
}

func isResponseEvent(event *cloudevents.Event) bool {

_, ok := event.Extensions()["replyAttribute"] //checks if event has replyAttribute
return ok
}

func (m *proxiedRequestMap) HandleRequest(w http.ResponseWriter, req *http.Request) {
fmt.Printf("received a new request, handling it!")
event, err := cloudevents.NewEventFromHTTPRequest(req)
if err != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

if isResponseEvent(event) {
fmt.Printf("received a response event, handling it!\n")
m.HandleResponseEvent(ctx, w, event)
fmt.Printf("finished handling response event\n")
} else {
fmt.Printf("received a new event, handling it!\n")
m.HandleNewEvent(ctx, w, event)
fmt.Printf("finished handling new event\n")
}
}

func main() {
proxyMap := &proxiedRequestMap{entries: make(map[string]*proxiedRequest)}
http.HandleFunc("/", proxyMap.HandleRequest)

http.ListenAndServe(":8080", nil)
}

Check failure on line 143 in pkg/apis/eventing/v1alpha1/requestreply_dataplane.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

[EOF Newline] reported by reviewdog 🐶 Missing newline Raw Output: pkg/apis/eventing/v1alpha1/requestreply_dataplane.go:143: Missing newline
Loading