diff --git a/Makefile b/Makefile index 50d3d19cae..5e1023ccc7 100644 --- a/Makefile +++ b/Makefile @@ -333,6 +333,7 @@ kubectl-retina-image: CONTEXT_DIR=$(REPO_ROOT) kapinger-image: + #docker buildx create --name=retina --append docker buildx build --builder retina --platform windows/amd64 --target windows-amd64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-windows-amd64 ./hack/tools/kapinger/ --push docker buildx build --builder retina --platform linux/amd64 --target linux-amd64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-linux-amd64 ./hack/tools/kapinger/ --push docker buildx build --builder retina --platform linux/arm64 --target linux-arm64 -t $(IMAGE_REGISTRY)/$(KAPINGER_IMAGE):$(TAG)-linux-arm64 ./hack/tools/kapinger/ --push diff --git a/hack/tools/kapinger/clients/http.go b/hack/tools/kapinger/clients/http.go index 471fe2f00e..b45764387e 100644 --- a/hack/tools/kapinger/clients/http.go +++ b/hack/tools/kapinger/clients/http.go @@ -41,7 +41,7 @@ func NewKapingerHTTPClient(clientset *kubernetes.Clientset, labelselector string Transport: &http.Transport{ DisableKeepAlives: true, }, - Timeout: 3 * time.Second, + Timeout: 15 * time.Second, }, labelselector: labelselector, clientset: clientset, @@ -80,22 +80,41 @@ func NewKapingerHTTPClient(clientset *kubernetes.Clientset, labelselector string func (k *KapingerHTTPClient) MakeRequests(ctx context.Context) error { ticker := time.NewTicker(k.interval) + sem := make(chan struct{}, 5) // semaphore to limit the number of active goroutines + + serviceController, err := newServiceController(k.clientset, k.labelselector) + if err != nil { + return fmt.Errorf("error creating service controller: %w", err) + } + + go serviceController.run(ctx) + for { select { case <-ctx.Done(): log.Printf("HTTP client context done") return nil case <-ticker.C: + if len(sem) >= cap(sem) { + log.Printf("max http clients reached, skipping interval") + continue + } go func() { + sem <- struct{}{} + defer func() { <-sem }() + for i := 0; i < k.volume; i++ { - for _, url := range k.urls { - body, err := k.makeRequest(ctx, url) - if err != nil { - log.Printf("error making request: %v", err) - } else { - log.Printf("response from %s: %s\n", url, string(body)) - } + ip := serviceController.getIP() + + url := fmt.Sprintf("http://%s:%d/", ip, k.port) + + body, err := k.makeRequest(ctx, url) + if err != nil { + log.Printf("error making request: %v", err) + } else { + log.Printf("response from %s: %s\n", url, string(body)) } + } }() } diff --git a/hack/tools/kapinger/clients/http_service_controller.go b/hack/tools/kapinger/clients/http_service_controller.go new file mode 100644 index 0000000000..a5f1075f6c --- /dev/null +++ b/hack/tools/kapinger/clients/http_service_controller.go @@ -0,0 +1,112 @@ +package clients + +import ( + "context" + "fmt" + "net" + "slices" + "sync" + "time" + + "log" + + "golang.org/x/exp/rand" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type serviceController struct { + sync.RWMutex + serviceInformer cache.SharedIndexInformer + + ips []net.IP +} + +func newServiceController(clientset kubernetes.Interface, labelselector string) (*serviceController, error) { + serviceInformer := informers.NewSharedInformerFactoryWithOptions(clientset, time.Hour*24, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.LabelSelector = labelselector // Filter by label selector + }), + ).Core().V1().Services().Informer() + c := &serviceController{ + serviceInformer: serviceInformer, + } + serviceInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.serviceAdd, + UpdateFunc: c.serviceUpdate, + DeleteFunc: c.serviceDelete, + }, + ) + + return c, nil +} + +func (c *serviceController) run(ctx context.Context) error { + stopCh := make(chan struct{}) + go func() { + <-ctx.Done() + close(stopCh) + }() + c.serviceInformer.Run(stopCh) + if !cache.WaitForCacheSync(ctx.Done(), c.serviceInformer.HasSynced) { + return fmt.Errorf("failed to sync") + } + return nil +} + +func (c *serviceController) serviceAdd(obj interface{}) { + service := obj.(*v1.Service) + log.Printf("service %s/%s added with ip %s", service.Namespace, service.Name, service.Spec.ClusterIP) + c.addIP(net.ParseIP(service.Spec.ClusterIP)) +} + +func (c *serviceController) serviceUpdate(old, new interface{}) { + newsvc := new.(*v1.Service) + oldsvc := new.(*v1.Service) + log.Printf("service %s/%s updated with new ip %s", newsvc.Namespace, newsvc.Name, newsvc.Spec.ClusterIP) + c.removeIP(net.ParseIP(oldsvc.Spec.ClusterIP)) + c.addIP(net.ParseIP(newsvc.Spec.ClusterIP)) +} + +func (c *serviceController) serviceDelete(obj interface{}) { + service := obj.(*v1.Service) + log.Printf("service %s/%s deleted", service.Namespace, service.Name) + c.removeIP(net.ParseIP(service.Spec.ClusterIP)) +} + +func (c *serviceController) getIP() net.IP { + c.RLock() + defer c.RUnlock() + return c.ips[rand.Intn(len(c.ips))] +} + +func (c *serviceController) addIP(ip net.IP) { + c.Lock() + defer c.Unlock() + c.ips = append(c.ips, ip) +} + +func (c *serviceController) removeIP(ip net.IP) { + c.Lock() + defer c.Unlock() + + // find the index of the ip + i := -1 + for j, cip := range c.ips { + if cip.Equal(ip) { + i = j + break + } + } + if i == -1 { + log.Printf("service with ip %s not found", ip) + return + } + + c.ips = slices.Delete(c.ips, i, i+1) + c.ips = slices.Clip(c.ips) +} diff --git a/hack/tools/kapinger/go.mod b/hack/tools/kapinger/go.mod index 55c98f344e..4ca7642ebd 100644 --- a/hack/tools/kapinger/go.mod +++ b/hack/tools/kapinger/go.mod @@ -3,10 +3,12 @@ module github.com/microsoft/retina/hack/tools/kapinger go 1.22.5 require ( + golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c golang.org/x/sync v0.10.0 k8s.io/api v0.30.2 k8s.io/apimachinery v0.30.2 k8s.io/client-go v0.30.2 + k8s.io/klog/v2 v2.120.1 ) require ( @@ -19,6 +21,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -38,7 +41,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/hack/tools/kapinger/go.sum b/hack/tools/kapinger/go.sum index e74bda0ddb..07fe0f1cc4 100644 --- a/hack/tools/kapinger/go.sum +++ b/hack/tools/kapinger/go.sum @@ -77,6 +77,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= +golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -111,8 +113,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= +golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/hack/tools/kapinger/main.go b/hack/tools/kapinger/main.go index 18f3787bd4..013a2d942f 100644 --- a/hack/tools/kapinger/main.go +++ b/hack/tools/kapinger/main.go @@ -54,7 +54,7 @@ func main() { g.Go(func() error { err = client.MakeRequests(gCtx) if err != nil { - return fmt.Errorf("error making request: %w", err) + return fmt.Errorf("error making client request: %w", err) } return nil }) diff --git a/hack/tools/kapinger/servers/http.go b/hack/tools/kapinger/servers/http.go index 5810c5e4d5..9abdaad883 100644 --- a/hack/tools/kapinger/servers/http.go +++ b/hack/tools/kapinger/servers/http.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net/http" + _ "net/http/pprof" "strconv" ) diff --git a/test/e2e/framework/kubernetes/create-kapinger-deployment.go b/test/e2e/framework/kubernetes/create-kapinger-deployment.go index 06862e1c09..22b0818f96 100644 --- a/test/e2e/framework/kubernetes/create-kapinger-deployment.go +++ b/test/e2e/framework/kubernetes/create-kapinger-deployment.go @@ -21,6 +21,9 @@ const ( KapingerTCPPort = 8085 KapingerUDPPort = 8086 MaxAffinityWeight = 100 + + KapingerBurstVolume = 1 + KapingerBurstIntervalMs = 500 ) type CreateKapingerDeployment struct { @@ -135,7 +138,7 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { Image: "acnpublic.azurecr.io/kapinger:20241014.7", Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ - "memory": resource.MustParse("20Mi"), + "memory": resource.MustParse("40Mi"), }, Limits: v1.ResourceList{ "memory": resource.MustParse("20Mi"), @@ -147,14 +150,26 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { }, }, Env: []v1.EnvVar{ - { - Name: "GODEBUG", - Value: "netdns=go", - }, { Name: "TARGET_TYPE", Value: "service", }, + { + Name: "POD_NAME", + ValueFrom: &v1.EnvVarSource{ + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "GOMEMLIMIT", + ValueFrom: &v1.EnvVarSource{ + ResourceFieldRef: &v1.ResourceFieldSelector{ + Resource: "limits.memory", + }, + }, + }, { Name: "HTTP_PORT", Value: strconv.Itoa(KapingerHTTPPort), @@ -167,6 +182,14 @@ func (c *CreateKapingerDeployment) GetKapingerDeployment() *appsv1.Deployment { Name: "UDP_PORT", Value: strconv.Itoa(KapingerUDPPort), }, + { + Name: "BURST_INTERVAL_MS", + Value: strconv.Itoa(KapingerBurstIntervalMs), + }, + { + Name: "BURST_VOLUME", + Value: strconv.Itoa(KapingerBurstVolume), + }, }, }, }, diff --git a/test/e2e/framework/types/job.go b/test/e2e/framework/types/job.go index 38b7194ab9..1da5f8e2f2 100644 --- a/test/e2e/framework/types/job.go +++ b/test/e2e/framework/types/job.go @@ -10,7 +10,7 @@ import ( var ( ErrEmptyDescription = fmt.Errorf("job description is empty") ErrNonNilError = fmt.Errorf("expected error to be non-nil") - ErrNilError = fmt.Errorf("expected error to be nil") + ErrNilError = fmt.Errorf("test ") ErrMissingParameter = fmt.Errorf("missing parameter") ErrParameterAlreadySet = fmt.Errorf("parameter already set") ErrOrphanSteps = fmt.Errorf("background steps with no corresponding stop") diff --git a/test/trafficgen/kapinger.yaml b/test/trafficgen/kapinger.yaml index b1476447e5..17ab42c5a1 100644 --- a/test/trafficgen/kapinger.yaml +++ b/test/trafficgen/kapinger.yaml @@ -47,15 +47,13 @@ spec: serviceAccountName: kapinger-sa containers: - name: kapinger - image: acnpublic.azurecr.io/kapinger:20241009.5 + image: acnpublic.azurecr.io/kapinger:v0.0.23-9-g23ef222 resources: limits: - memory: 20Mi + memory: 40Mi requests: memory: 20Mi env: - - name: GODEBUG - value: netdns=go - name: TARGET_TYPE value: "service" - name: POD_IP @@ -66,6 +64,11 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: GOMEMLIMIT + valueFrom: + resourceFieldRef: + divisor: "0" + resource: limits.memory - name: HTTP_PORT value: "8080" - name: TCP_PORT @@ -98,7 +101,7 @@ spec: serviceAccountName: kapinger-sa containers: - name: kapinger - image: acnpublic.azurecr.io/kapinger:20241009.5 + image: acnpublic.azurecr.io/kapinger:v0.0.23-9-g23ef222 resources: limits: memory: 20Mi @@ -115,12 +118,21 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: GOMEMLIMIT + valueFrom: + resourceFieldRef: + divisor: "0" + resource: limits.memory - name: HTTP_PORT value: "8080" - name: TCP_PORT value: "8085" - name: UDP_PORT value: "8086" + - name: BURST_INTERVAL_MS + value: "500" + - name: BURST_VOLUME + value: "1" ports: - containerPort: 8080 ---