From 648149f80362779a119838e446de6f8447efb558 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Wed, 29 Jan 2025 17:09:31 +0000 Subject: [PATCH 1/4] service controller --- Makefile | 1 + hack/tools/kapinger/clients/http.go | 35 +++++-- .../clients/http_service_controller.go | 97 +++++++++++++++++++ hack/tools/kapinger/go.mod | 4 +- hack/tools/kapinger/go.sum | 6 +- hack/tools/kapinger/main.go | 2 +- hack/tools/kapinger/servers/http.go | 1 + hack/tools/kapinger/servers/util.go | 2 +- 8 files changed, 135 insertions(+), 13 deletions(-) create mode 100644 hack/tools/kapinger/clients/http_service_controller.go diff --git a/Makefile b/Makefile index 50d3d19cae..5e1023ccc7 100644 --- a/Makefile +++ b/Makefile @@ -333,6 +333,7 @@ kubectl-retina-image: CONTEXT_DIR=$(REPO_ROOT) kapinger-image: + #docker buildx create --name=retina --append docker buildx build --builder retina --platform windows/amd64 --target windows-amd64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-windows-amd64 ./hack/tools/kapinger/ --push docker buildx build --builder retina --platform linux/amd64 --target linux-amd64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-linux-amd64 ./hack/tools/kapinger/ --push docker buildx build --builder retina --platform linux/arm64 --target linux-arm64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-linux-arm64 ./hack/tools/kapinger/ --push diff --git a/hack/tools/kapinger/clients/http.go b/hack/tools/kapinger/clients/http.go index 471fe2f00e..13445a740b 100644 --- a/hack/tools/kapinger/clients/http.go +++ b/hack/tools/kapinger/clients/http.go @@ -41,7 +41,7 @@ func NewKapingerHTTPClient(clientset *kubernetes.Clientset, labelselector string Transport: &http.Transport{ DisableKeepAlives: true, }, - Timeout: 3 * time.Second, + Timeout: 15 * time.Second, }, labelselector: labelselector, clientset: clientset, @@ -80,22 +80,41 @@ func NewKapingerHTTPClient(clientset *kubernetes.Clientset, labelselector string func (k *KapingerHTTPClient) MakeRequests(ctx context.Context) error { ticker := time.NewTicker(k.interval) + sem := make(chan struct{}, 5) // semaphore to limit the number of active goroutines + + serviceController, err := NewServiceLoggingController(k.clientset, k.labelselector) + if err != nil { + return fmt.Errorf("error creating service controller: %w", err) + } + + go serviceController.Run(ctx) + for { select { case <-ctx.Done(): log.Printf("HTTP client context done") return nil case <-ticker.C: + if len(sem) >= cap(sem) { + log.Printf("max http clients reached, skipping interval") + continue + } go func() { + sem <- struct{}{} + defer func() { <-sem }() + for i := 0; i < k.volume; i++ { - for _, url := range k.urls { - body, err := k.makeRequest(ctx, url) - if err != nil { - log.Printf("error making request: %v", err) - } else { - log.Printf("response from %s: %s\n", url, string(body)) - } + ip := serviceController.getIP() + + url := fmt.Sprintf("http://%s:%d/", ip, k.port) + + body, err := k.makeRequest(ctx, url) + if err != nil { + log.Printf("error making request: %v", err) + } else { + log.Printf("response from %s: %s\n", url, string(body)) } + } }() } diff --git a/hack/tools/kapinger/clients/http_service_controller.go b/hack/tools/kapinger/clients/http_service_controller.go new file mode 100644 index 0000000000..4af0532740 --- /dev/null +++ b/hack/tools/kapinger/clients/http_service_controller.go @@ -0,0 +1,97 @@ +package clients + +import ( + "context" + "fmt" + "sync" + "time" + + "golang.org/x/exp/rand" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +type ServiceLoggingController struct { + sync.RWMutex + serviceInformer cache.SharedIndexInformer + + ips map[string]string +} + +func (c *ServiceLoggingController) Run(ctx context.Context) error { + stopCh := make(chan struct{}) + go func() { + <-ctx.Done() + close(stopCh) + }() + c.serviceInformer.Run(stopCh) + if !cache.WaitForCacheSync(ctx.Done(), c.serviceInformer.HasSynced) { + return fmt.Errorf("failed to sync") + } + return nil +} + +func (c *ServiceLoggingController) serviceAdd(obj interface{}) { + service := obj.(*v1.Service) + klog.Infof("SERVICE CREATED: %s/%s", service.Namespace, service.Name) + c.Lock() + defer c.Unlock() + c.ips[service.Name] = service.Spec.ClusterIP +} + +func (c *ServiceLoggingController) serviceUpdate(old, new interface{}) { + newsvc := new.(*v1.Service) + oldsvc := new.(*v1.Service) + klog.Infof("SERVICE UPDATED. %s/%s", newsvc.Namespace, newsvc.Name) + c.Lock() + defer c.Unlock() + delete(c.ips, oldsvc.Name) + c.ips[newsvc.Name] = newsvc.Spec.ClusterIP +} + +func (c *ServiceLoggingController) serviceDelete(obj interface{}) { + service := obj.(*v1.Service) + klog.Infof("SERVICE DELETED: %s/%s", service.Namespace, service.Name) + c.Lock() + defer c.Unlock() + delete(c.ips, service.Name) +} + +func (c *ServiceLoggingController) getIP() string { + c.RLock() + defer c.RUnlock() + // select random ip from map + randIndex := rand.Intn(len(c.ips)) + for key := range c.ips { + randIndex-- + if randIndex == 0 { + return c.ips[key] + } + } + return "" +} + +func NewServiceLoggingController(clientset kubernetes.Interface, labelselector string) (*ServiceLoggingController, error) { + serviceInformer := informers.NewSharedInformerFactoryWithOptions(clientset, time.Hour*24, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = labelselector // Filter by label selector + }), + ).Core().V1().Services().Informer() + c := &ServiceLoggingController{ + serviceInformer: serviceInformer, + } + serviceInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.serviceAdd, + UpdateFunc: c.serviceUpdate, + DeleteFunc: c.serviceDelete, + }, + ) + + c.ips = make(map[string]string) + return c, nil +} diff --git a/hack/tools/kapinger/go.mod b/hack/tools/kapinger/go.mod index 55c98f344e..4ca7642ebd 100644 --- a/hack/tools/kapinger/go.mod +++ b/hack/tools/kapinger/go.mod @@ -3,10 +3,12 @@ module github.com/microsoft/retina/hack/tools/kapinger go 1.22.5 require ( + golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c golang.org/x/sync v0.10.0 k8s.io/api v0.30.2 k8s.io/apimachinery v0.30.2 k8s.io/client-go v0.30.2 + k8s.io/klog/v2 v2.120.1 ) require ( @@ -19,6 +21,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -38,7 +41,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/hack/tools/kapinger/go.sum b/hack/tools/kapinger/go.sum index e74bda0ddb..07fe0f1cc4 100644 --- a/hack/tools/kapinger/go.sum +++ b/hack/tools/kapinger/go.sum @@ -77,6 +77,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -111,8 +113,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= +golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/hack/tools/kapinger/main.go b/hack/tools/kapinger/main.go index 18f3787bd4..013a2d942f 100644 --- a/hack/tools/kapinger/main.go +++ b/hack/tools/kapinger/main.go @@ -54,7 +54,7 @@ func main() { g.Go(func() error { err = client.MakeRequests(gCtx) if err != nil { - return fmt.Errorf("error making request: %w", err) + return fmt.Errorf("error making client request: %w", err) } return nil }) diff --git a/hack/tools/kapinger/servers/http.go b/hack/tools/kapinger/servers/http.go index 5810c5e4d5..9abdaad883 100644 --- a/hack/tools/kapinger/servers/http.go +++ b/hack/tools/kapinger/servers/http.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net/http" + _ "net/http/pprof" "strconv" ) diff --git a/hack/tools/kapinger/servers/util.go b/hack/tools/kapinger/servers/util.go index 497c90071e..d81662b5a0 100644 --- a/hack/tools/kapinger/servers/util.go +++ b/hack/tools/kapinger/servers/util.go @@ -7,5 +7,5 @@ import ( func getResponse(addressString, protocol string) []byte { podname := os.Getenv("POD_NAME") - return []byte(fmt.Sprintf("connected to: %s via %s, connected from: %v", podname, protocol, addressString)) + return []byte(fmt.Sprintf("connected to: %s via %s, connected from: %v\n", podname, protocol, addressString)) } From 23ef2225eb43224ca33f19b81b05252caf4dbd70 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Wed, 29 Jan 2025 18:29:55 +0000 Subject: [PATCH 2/4] project limits as GOMEMLIMIT --- hack/tools/kapinger/clients/http.go | 4 +- .../clients/http_service_controller.go | 108 ++++++++++-------- .../kubernetes/create-kapinger-deployment.go | 33 +++++- test/trafficgen/kapinger.yaml | 18 ++- 4 files changed, 106 insertions(+), 57 deletions(-) diff --git a/hack/tools/kapinger/clients/http.go b/hack/tools/kapinger/clients/http.go index 13445a740b..b45764387e 100644 --- a/hack/tools/kapinger/clients/http.go +++ b/hack/tools/kapinger/clients/http.go @@ -82,12 +82,12 @@ func (k *KapingerHTTPClient) MakeRequests(ctx context.Context) error { ticker := time.NewTicker(k.interval) sem := make(chan struct{}, 5) // semaphore to limit the number of active goroutines - serviceController, err := NewServiceLoggingController(k.clientset, k.labelselector) + serviceController, err := newServiceController(k.clientset, k.labelselector) if err != nil { return fmt.Errorf("error creating service controller: %w", err) } - go serviceController.Run(ctx) + go serviceController.run(ctx) for { select { diff --git a/hack/tools/kapinger/clients/http_service_controller.go b/hack/tools/kapinger/clients/http_service_controller.go index 4af0532740..aeac2ed68a 100644 --- a/hack/tools/kapinger/clients/http_service_controller.go +++ b/hack/tools/kapinger/clients/http_service_controller.go @@ -3,26 +3,48 @@ package clients import ( "context" "fmt" + "slices" "sync" "time" + "log" + "golang.org/x/exp/rand" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" ) -type ServiceLoggingController struct { +type serviceController struct { sync.RWMutex serviceInformer cache.SharedIndexInformer - ips map[string]string + ips []string } -func (c *ServiceLoggingController) Run(ctx context.Context) error { +func newServiceController(clientset kubernetes.Interface, labelselector string) (*serviceController, error) { + serviceInformer := informers.NewSharedInformerFactoryWithOptions(clientset, time.Hour*24, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = labelselector // Filter by label selector + }), + ).Core().V1().Services().Informer() + c := &serviceController{ + serviceInformer: serviceInformer, + } + serviceInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.serviceAdd, + UpdateFunc: c.serviceUpdate, + DeleteFunc: c.serviceDelete, + }, + ) + + return c, nil +} + +func (c *serviceController) run(ctx context.Context) error { stopCh := make(chan struct{}) go func() { <-ctx.Done() @@ -35,63 +57,55 @@ func (c *ServiceLoggingController) Run(ctx context.Context) error { return nil } -func (c *ServiceLoggingController) serviceAdd(obj interface{}) { +func (c *serviceController) serviceAdd(obj interface{}) { service := obj.(*v1.Service) - klog.Infof("SERVICE CREATED: %s/%s", service.Namespace, service.Name) - c.Lock() - defer c.Unlock() - c.ips[service.Name] = service.Spec.ClusterIP + log.Printf("service %s/%s added with ip %s", service.Namespace, service.Name, service.Spec.ClusterIP) + c.addIP(service.Spec.ClusterIP) } -func (c *ServiceLoggingController) serviceUpdate(old, new interface{}) { +func (c *serviceController) serviceUpdate(old, new interface{}) { newsvc := new.(*v1.Service) oldsvc := new.(*v1.Service) - klog.Infof("SERVICE UPDATED. %s/%s", newsvc.Namespace, newsvc.Name) - c.Lock() - defer c.Unlock() - delete(c.ips, oldsvc.Name) - c.ips[newsvc.Name] = newsvc.Spec.ClusterIP + log.Printf("service %s/%s updated with new ip %s", newsvc.Namespace, newsvc.Name, newsvc.Spec.ClusterIP) + c.removeIP(oldsvc.Spec.ClusterIP) + c.addIP(newsvc.Spec.ClusterIP) } -func (c *ServiceLoggingController) serviceDelete(obj interface{}) { +func (c *serviceController) serviceDelete(obj interface{}) { service := obj.(*v1.Service) - klog.Infof("SERVICE DELETED: %s/%s", service.Namespace, service.Name) - c.Lock() - defer c.Unlock() - delete(c.ips, service.Name) + log.Printf("service %s/%s deleted", service.Namespace, service.Name) + c.removeIP(service.Spec.ClusterIP) } -func (c *ServiceLoggingController) getIP() string { +func (c *serviceController) getIP() string { c.RLock() defer c.RUnlock() - // select random ip from map - randIndex := rand.Intn(len(c.ips)) - for key := range c.ips { - randIndex-- - if randIndex == 0 { - return c.ips[key] - } - } - return "" + return c.ips[rand.Intn(len(c.ips))] } -func NewServiceLoggingController(clientset kubernetes.Interface, labelselector string) (*ServiceLoggingController, error) { - serviceInformer := informers.NewSharedInformerFactoryWithOptions(clientset, time.Hour*24, - informers.WithTweakListOptions(func(options *metav1.ListOptions) { - options.LabelSelector = labelselector // Filter by label selector - }), - ).Core().V1().Services().Informer() - c := &ServiceLoggingController{ - serviceInformer: serviceInformer, +func (c *serviceController) addIP(ip string) { + c.Lock() + defer c.Unlock() + c.ips = append(c.ips, ip) +} + +func (c *serviceController) removeIP(ip string) { + c.Lock() + defer c.Unlock() + + // find the index of the ip + i := -1 + for j, cip := range c.ips { + if cip == ip { + i = j + break + } + } + if i == -1 { + log.Printf("service with ip %s not found", ip) + return } - serviceInformer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.serviceAdd, - UpdateFunc: c.serviceUpdate, - DeleteFunc: c.serviceDelete, - }, - ) - c.ips = make(map[string]string) - return c, nil + c.ips = slices.Delete(c.ips, i, i+1) + c.ips = slices.Clip(c.ips) } diff --git a/test/e2e/framework/kubernetes/create-kapinger-deployment.go b/test/e2e/framework/kubernetes/create-kapinger-deployment.go index 06862e1c09..22b0818f96 100644 --- a/test/e2e/framework/kubernetes/create-kapinger-deployment.go +++ b/test/e2e/framework/kubernetes/create-kapinger-deployment.go @@ -21,6 +21,9 @@ const ( KapingerTCPPort = 8085 KapingerUDPPort = 8086 MaxAffinityWeight = 100 + + KapingerBurstVolume = 1 + KapingerBurstIntervalMs = 500 ) type CreateKapingerDeployment struct { @@ -135,7 +138,7 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { Image: "acnpublic.azurecr.io/kapinger:20241014.7", Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ - "memory": resource.MustParse("20Mi"), + "memory": resource.MustParse("40Mi"), }, Limits: v1.ResourceList{ "memory": resource.MustParse("20Mi"), @@ -147,14 +150,26 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { }, }, Env: []v1.EnvVar{ - { - Name: "GODEBUG", - Value: "netdns=go", - }, { Name: "TARGET_TYPE", Value: "service", }, + { + Name: "POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "GOMEMLIMIT", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + Resource: "limits.memory", + }, + }, + }, { Name: "HTTP_PORT", Value: strconv.Itoa(KapingerHTTPPort), @@ -167,6 +182,14 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { Name: "UDP_PORT", Value: strconv.Itoa(KapingerUDPPort), }, + { + Name: "BURST_INTERVAL_MS", + Value: strconv.Itoa(KapingerBurstIntervalMs), + }, + { + Name: "BURST_VOLUME", + Value: strconv.Itoa(KapingerBurstVolume), + }, }, }, }, diff --git a/test/trafficgen/kapinger.yaml b/test/trafficgen/kapinger.yaml index b1476447e5..dc67aa4498 100644 --- a/test/trafficgen/kapinger.yaml +++ b/test/trafficgen/kapinger.yaml @@ -50,12 +50,10 @@ spec: image: acnpublic.azurecr.io/kapinger:20241009.5 resources: limits: - memory: 20Mi + memory: 40Mi requests: memory: 20Mi env: - - name: GODEBUG - value: netdns=go - name: TARGET_TYPE value: "service" - name: POD_IP @@ -66,6 +64,11 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: GOMEMLIMIT + valueFrom: + resourceFieldRef: + divisor: "0" + resource: limits.memory - name: HTTP_PORT value: "8080" - name: TCP_PORT @@ -115,12 +118,21 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: GOMEMLIMIT + valueFrom: + resourceFieldRef: + divisor: "0" + resource: limits.memory - name: HTTP_PORT value: "8080" - name: TCP_PORT value: "8085" - name: UDP_PORT value: "8086" + - name: BURST_INTERVAL_MS + value: "500" + - name: BURST_VOLUME + value: "1" ports: - containerPort: 8080 --- From abc1bbb7ebf8f85c1fa730ee9a10e12438edd601 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Wed, 29 Jan 2025 18:54:12 +0000 Subject: [PATCH 3/4] use commit sha for tag --- test/trafficgen/kapinger.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/trafficgen/kapinger.yaml b/test/trafficgen/kapinger.yaml index dc67aa4498..17ab42c5a1 100644 --- a/test/trafficgen/kapinger.yaml +++ b/test/trafficgen/kapinger.yaml @@ -47,7 +47,7 @@ spec: serviceAccountName: kapinger-sa containers: - name: kapinger - image: acnpublic.azurecr.io/kapinger:20241009.5 + image: acnpublic.azurecr.io/kapinger:v0.0.23-9-g23ef222 resources: limits: memory: 40Mi @@ -101,7 +101,7 @@ spec: serviceAccountName: kapinger-sa containers: - name: kapinger - image: acnpublic.azurecr.io/kapinger:20241009.5 + image: acnpublic.azurecr.io/kapinger:v0.0.23-9-g23ef222 resources: limits: memory: 20Mi From 209f70a7824bc55878cca964a70594382ac3562a Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Mon, 3 Feb 2025 19:21:01 +0000 Subject: [PATCH 4/4] use net.IP --- .../clients/http_service_controller.go | 21 ++++++++++--------- hack/tools/kapinger/servers/util.go | 2 +- test/e2e/framework/types/job.go | 2 +- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/hack/tools/kapinger/clients/http_service_controller.go b/hack/tools/kapinger/clients/http_service_controller.go index aeac2ed68a..a5f1075f6c 100644 --- a/hack/tools/kapinger/clients/http_service_controller.go +++ b/hack/tools/kapinger/clients/http_service_controller.go @@ -3,6 +3,7 @@ package clients import ( "context" "fmt" + "net" "slices" "sync" "time" @@ -21,7 +22,7 @@ type serviceController struct { sync.RWMutex serviceInformer cache.SharedIndexInformer - ips []string + ips []net.IP } func newServiceController(clientset kubernetes.Interface, labelselector string) (*serviceController, error) { @@ -60,43 +61,43 @@ func (c *serviceController) run(ctx context.Context) error { func (c *serviceController) serviceAdd(obj interface{}) { service := obj.(*v1.Service) log.Printf("service %s/%s added with ip %s", service.Namespace, service.Name, service.Spec.ClusterIP) - c.addIP(service.Spec.ClusterIP) + c.addIP(net.ParseIP(service.Spec.ClusterIP)) } func (c *serviceController) serviceUpdate(old, new interface{}) { newsvc := new.(*v1.Service) oldsvc := new.(*v1.Service) log.Printf("service %s/%s updated with new ip %s", newsvc.Namespace, newsvc.Name, newsvc.Spec.ClusterIP) - c.removeIP(oldsvc.Spec.ClusterIP) - c.addIP(newsvc.Spec.ClusterIP) + c.removeIP(net.ParseIP(oldsvc.Spec.ClusterIP)) + c.addIP(net.ParseIP(newsvc.Spec.ClusterIP)) } func (c *serviceController) serviceDelete(obj interface{}) { service := obj.(*v1.Service) log.Printf("service %s/%s deleted", service.Namespace, service.Name) - c.removeIP(service.Spec.ClusterIP) + c.removeIP(net.ParseIP(service.Spec.ClusterIP)) } -func (c *serviceController) getIP() string { +func (c *serviceController) getIP() net.IP { c.RLock() defer c.RUnlock() return c.ips[rand.Intn(len(c.ips))] } -func (c *serviceController) addIP(ip string) { +func (c *serviceController) addIP(ip net.IP) { c.Lock() defer c.Unlock() c.ips = append(c.ips, ip) } -func (c *serviceController) removeIP(ip string) { +func (c *serviceController) removeIP(ip net.IP) { c.Lock() defer c.Unlock() - + // find the index of the ip i := -1 for j, cip := range c.ips { - if cip == ip { + if cip.Equal(ip) { i = j break } diff --git a/hack/tools/kapinger/servers/util.go b/hack/tools/kapinger/servers/util.go index d81662b5a0..497c90071e 100644 --- a/hack/tools/kapinger/servers/util.go +++ b/hack/tools/kapinger/servers/util.go @@ -7,5 +7,5 @@ import ( func getResponse(addressString, protocol string) []byte { podname := os.Getenv("POD_NAME") - return []byte(fmt.Sprintf("connected to: %s via %s, connected from: %v\n", podname, protocol, addressString)) + return []byte(fmt.Sprintf("connected to: %s via %s, connected from: %v", podname, protocol, addressString)) } diff --git a/test/e2e/framework/types/job.go b/test/e2e/framework/types/job.go index 38b7194ab9..1da5f8e2f2 100644 --- a/test/e2e/framework/types/job.go +++ b/test/e2e/framework/types/job.go @@ -10,7 +10,7 @@ import ( var ( ErrEmptyDescription = fmt.Errorf("job description is empty") ErrNonNilError = fmt.Errorf("expected error to be non-nil") - ErrNilError = fmt.Errorf("expected error to be nil") + ErrNilError = fmt.Errorf("test ") ErrMissingParameter = fmt.Errorf("missing parameter") ErrParameterAlreadySet = fmt.Errorf("parameter already set") ErrOrphanSteps = fmt.Errorf("background steps with no corresponding stop")