Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kapinger use service IP instead of name #1283

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ kubectl-retina-image:
CONTEXT_DIR=$(REPO_ROOT)

kapinger-image:
#docker buildx create --name=retina --append
docker buildx build --builder retina --platform windows/amd64 --target windows-amd64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-windows-amd64 ./hack/tools/kapinger/ --push
docker buildx build --builder retina --platform linux/amd64 --target linux-amd64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-linux-amd64 ./hack/tools/kapinger/ --push
docker buildx build --builder retina --platform linux/arm64 --target linux-arm64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-linux-arm64 ./hack/tools/kapinger/ --push
Expand Down
35 changes: 27 additions & 8 deletions hack/tools/kapinger/clients/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewKapingerHTTPClient(clientset *kubernetes.Clientset, labelselector string
Transport: &http.Transport{
DisableKeepAlives: true,
},
Timeout: 3 * time.Second,
Timeout: 15 * time.Second,
},
labelselector: labelselector,
clientset: clientset,
Expand Down Expand Up @@ -80,22 +80,41 @@ func NewKapingerHTTPClient(clientset *kubernetes.Clientset, labelselector string

func (k *KapingerHTTPClient) MakeRequests(ctx context.Context) error {
ticker := time.NewTicker(k.interval)
sem := make(chan struct{}, 5) // semaphore to limit the number of active goroutines

serviceController, err := newServiceController(k.clientset, k.labelselector)
if err != nil {
return fmt.Errorf("error creating service controller: %w", err)
}

go serviceController.run(ctx)

for {
select {
case <-ctx.Done():
log.Printf("HTTP client context done")
return nil
case <-ticker.C:
if len(sem) >= cap(sem) {
log.Printf("max http clients reached, skipping interval")
continue
}
go func() {
sem <- struct{}{}
defer func() { <-sem }()

for i := 0; i < k.volume; i++ {
for _, url := range k.urls {
body, err := k.makeRequest(ctx, url)
if err != nil {
log.Printf("error making request: %v", err)
} else {
log.Printf("response from %s: %s\n", url, string(body))
}
ip := serviceController.getIP()

url := fmt.Sprintf("http://%s:%d/", ip, k.port)

body, err := k.makeRequest(ctx, url)
if err != nil {
log.Printf("error making request: %v", err)
} else {
log.Printf("response from %s: %s\n", url, string(body))
}

}
}()
}
Expand Down
111 changes: 111 additions & 0 deletions hack/tools/kapinger/clients/http_service_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package clients

import (
"context"
"fmt"
"slices"
"sync"
"time"

"log"

"golang.org/x/exp/rand"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

type serviceController struct {
sync.RWMutex
serviceInformer cache.SharedIndexInformer

ips []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although a slice is more memory-efficient, if the order of IPs is not important, have you considered using a map instead? The lookup time would be much faster, especially with large number of IPs. Using a map would also simplify the code for removeIP func.

Copy link

@MikeZappa87 MikeZappa87 Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on using net.IP instead of string? What stops me from putting 'wingding' as a value?

Edit: It looks like we are pulling this from clusterIP which is a string as well. One day the service resource will go away .... My comment is probably unimportant

}

func newServiceController(clientset kubernetes.Interface, labelselector string) (*serviceController, error) {
serviceInformer := informers.NewSharedInformerFactoryWithOptions(clientset, time.Hour*24,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelselector // Filter by label selector
}),
).Core().V1().Services().Informer()
c := &serviceController{
serviceInformer: serviceInformer,
}
serviceInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.serviceAdd,
UpdateFunc: c.serviceUpdate,
DeleteFunc: c.serviceDelete,
},
)

return c, nil
}

func (c *serviceController) run(ctx context.Context) error {
stopCh := make(chan struct{})
go func() {
<-ctx.Done()
close(stopCh)
}()
c.serviceInformer.Run(stopCh)
if !cache.WaitForCacheSync(ctx.Done(), c.serviceInformer.HasSynced) {
return fmt.Errorf("failed to sync")
}
return nil
}

func (c *serviceController) serviceAdd(obj interface{}) {
service := obj.(*v1.Service)
log.Printf("service %s/%s added with ip %s", service.Namespace, service.Name, service.Spec.ClusterIP)
c.addIP(service.Spec.ClusterIP)
}

func (c *serviceController) serviceUpdate(old, new interface{}) {
newsvc := new.(*v1.Service)
oldsvc := new.(*v1.Service)
log.Printf("service %s/%s updated with new ip %s", newsvc.Namespace, newsvc.Name, newsvc.Spec.ClusterIP)
c.removeIP(oldsvc.Spec.ClusterIP)
c.addIP(newsvc.Spec.ClusterIP)
}

func (c *serviceController) serviceDelete(obj interface{}) {
service := obj.(*v1.Service)
log.Printf("service %s/%s deleted", service.Namespace, service.Name)
c.removeIP(service.Spec.ClusterIP)
}

func (c *serviceController) getIP() string {
c.RLock()
defer c.RUnlock()
return c.ips[rand.Intn(len(c.ips))]
}

func (c *serviceController) addIP(ip string) {
c.Lock()
defer c.Unlock()
c.ips = append(c.ips, ip)
}

func (c *serviceController) removeIP(ip string) {
c.Lock()
defer c.Unlock()

// find the index of the ip
i := -1
for j, cip := range c.ips {
if cip == ip {
i = j
break
}
}
if i == -1 {
log.Printf("service with ip %s not found", ip)
return
}

c.ips = slices.Delete(c.ips, i, i+1)
c.ips = slices.Clip(c.ips)
}
4 changes: 3 additions & 1 deletion hack/tools/kapinger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ module github.com/microsoft/retina/hack/tools/kapinger
go 1.22.5

require (
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c
golang.org/x/sync v0.10.0
k8s.io/api v0.30.2
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
k8s.io/klog/v2 v2.120.1
)

require (
Expand All @@ -19,6 +21,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand All @@ -38,7 +41,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
6 changes: 4 additions & 2 deletions hack/tools/kapinger/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down Expand Up @@ -111,8 +113,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE=
golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
2 changes: 1 addition & 1 deletion hack/tools/kapinger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
g.Go(func() error {
err = client.MakeRequests(gCtx)
if err != nil {
return fmt.Errorf("error making request: %w", err)
return fmt.Errorf("error making client request: %w", err)
}
return nil
})
Expand Down
1 change: 1 addition & 0 deletions hack/tools/kapinger/servers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"strconv"
)

Expand Down
2 changes: 1 addition & 1 deletion hack/tools/kapinger/servers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import (

func getResponse(addressString, protocol string) []byte {
podname := os.Getenv("POD_NAME")
return []byte(fmt.Sprintf("connected to: %s via %s, connected from: %v", podname, protocol, addressString))
return []byte(fmt.Sprintf("connected to: %s via %s, connected from: %v\n", podname, protocol, addressString))
}
33 changes: 28 additions & 5 deletions test/e2e/framework/kubernetes/create-kapinger-deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (
KapingerTCPPort = 8085
KapingerUDPPort = 8086
MaxAffinityWeight = 100

KapingerBurstVolume = 1
KapingerBurstIntervalMs = 500
)

type CreateKapingerDeployment struct {
Expand Down Expand Up @@ -135,7 +138,7 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment {
Image: "acnpublic.azurecr.io/kapinger:20241014.7",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
"memory": resource.MustParse("20Mi"),
"memory": resource.MustParse("40Mi"),
},
Limits: v1.ResourceList{
"memory": resource.MustParse("20Mi"),
Expand All @@ -147,14 +150,26 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment {
},
},
Env: []v1.EnvVar{
{
Name: "GODEBUG",
Value: "netdns=go",
},
{
Name: "TARGET_TYPE",
Value: "service",
},
{
Name: "POD_NAME",
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: "GOMEMLIMIT",
ValueFrom: &v1.EnvVarSource{
ResourceFieldRef: &v1.ResourceFieldSelector{
Resource: "limits.memory",
},
},
},
{
Name: "HTTP_PORT",
Value: strconv.Itoa(KapingerHTTPPort),
Expand All @@ -167,6 +182,14 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment {
Name: "UDP_PORT",
Value: strconv.Itoa(KapingerUDPPort),
},
{
Name: "BURST_INTERVAL_MS",
Value: strconv.Itoa(KapingerBurstIntervalMs),
},
{
Name: "BURST_VOLUME",
Value: strconv.Itoa(KapingerBurstVolume),
},
},
},
},
Expand Down
22 changes: 17 additions & 5 deletions test/trafficgen/kapinger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ spec:
serviceAccountName: kapinger-sa
containers:
- name: kapinger
image: acnpublic.azurecr.io/kapinger:20241009.5
image: acnpublic.azurecr.io/kapinger:v0.0.23-9-g23ef222
resources:
limits:
memory: 20Mi
memory: 40Mi
requests:
memory: 20Mi
env:
- name: GODEBUG
value: netdns=go
- name: TARGET_TYPE
value: "service"
- name: POD_IP
Expand All @@ -66,6 +64,11 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: GOMEMLIMIT
valueFrom:
resourceFieldRef:
divisor: "0"
resource: limits.memory
- name: HTTP_PORT
value: "8080"
- name: TCP_PORT
Expand Down Expand Up @@ -98,7 +101,7 @@ spec:
serviceAccountName: kapinger-sa
containers:
- name: kapinger
image: acnpublic.azurecr.io/kapinger:20241009.5
image: acnpublic.azurecr.io/kapinger:v0.0.23-9-g23ef222
resources:
limits:
memory: 20Mi
Expand All @@ -115,12 +118,21 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: GOMEMLIMIT
valueFrom:
resourceFieldRef:
divisor: "0"
resource: limits.memory
- name: HTTP_PORT
value: "8080"
- name: TCP_PORT
value: "8085"
- name: UDP_PORT
value: "8086"
- name: BURST_INTERVAL_MS
value: "500"
- name: BURST_VOLUME
value: "1"
ports:
- containerPort: 8080
---
Expand Down
Loading