From 59e5dbaf38b486a2722c97cdd871225c0b5d2ac1 Mon Sep 17 00:00:00 2001 From: Quang Nguyen Date: Thu, 16 Jan 2025 07:49:55 -0500 Subject: [PATCH] feat: veth watcher --- .gitignore | 2 + Makefile | 3 + docs/02-Installation/01-Setup.md | 4 +- docs/02-Installation/03-Config.md | 59 +++- docs/02-Installation/04-prometheus.md | 9 +- go.mod | 34 +- go.sum | 76 ++--- pkg/managers/pluginmanager/pluginmanager.go | 19 +- .../pluginmanager/pluginmanager_test.go | 6 +- .../watchermanager/mocks/mock_types.go | 88 +++--- pkg/managers/watchermanager/types.go | 19 +- pkg/managers/watchermanager/watchermanager.go | 69 ++-- .../watchermanager/watchermanager_test.go | 47 +-- pkg/plugin/packetparser/packetparser_linux.go | 29 +- pkg/plugin/packetparser/types_linux.go | 2 +- pkg/watchers/apiserver/apiserver.go | 294 ++++++++---------- pkg/watchers/apiserver/apiserver_test.go | 141 ++------- pkg/watchers/apiserver/types.go | 38 ++- pkg/watchers/endpoint/endpoint.go | 104 ------- pkg/watchers/endpoint/endpoint_linux.go | 54 ---- pkg/watchers/endpoint/endpoint_linux_test.go | 228 -------------- pkg/watchers/endpoint/endpoint_windows.go | 18 -- pkg/watchers/endpoint/types.go | 21 ++ pkg/watchers/endpoint/watcher_linux.go | 64 ++++ pkg/watchers/endpoint/watcher_windows.go | 24 ++ test/managers/filtermanager/main.go | 22 +- test/plugin/packetparser/main_linux.go | 2 +- test/watchers/apiserver/main.go | 2 +- test/watchers/veth/main.go | 2 +- 29 files changed, 553 insertions(+), 927 deletions(-) delete mode 100644 pkg/watchers/endpoint/endpoint.go delete mode 100644 pkg/watchers/endpoint/endpoint_linux.go delete mode 100644 pkg/watchers/endpoint/endpoint_linux_test.go delete mode 100644 pkg/watchers/endpoint/endpoint_windows.go create mode 100644 pkg/watchers/endpoint/watcher_linux.go create mode 100644 pkg/watchers/endpoint/watcher_windows.go diff --git a/.gitignore b/.gitignore index 026134d26a..32c777c697 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,5 @@ image-metadata-*.json *results*.json netperf-*.json netperf-*.csv + +.certs/ diff --git a/Makefile b/Makefile index 50d3d19cae..14db13ab96 100644 --- a/Makefile +++ b/Makefile @@ -549,6 +549,9 @@ get-certs: hubble config set tls true hubble config set tls-server-name instance.hubble-relay.cilium.io +# Replaces every '.' in $(1) with '\.' +escape_dot = $(subst .,\.,$(1)) + .PHONY: clean-certs clean-certs: rm -rf $(CERT_DIR) diff --git a/docs/02-Installation/01-Setup.md b/docs/02-Installation/01-Setup.md index 34caeb1eca..5b436d21ad 100644 --- a/docs/02-Installation/01-Setup.md +++ b/docs/02-Installation/01-Setup.md @@ -6,7 +6,9 @@ Note: you can also run captures with just the [CLI](./02-CLI.md). ## Installation -Requires Helm version >= v3.8.0. +### Requirements + +- Helm version >= v3.8.0. ### Basic Mode diff --git a/docs/02-Installation/03-Config.md b/docs/02-Installation/03-Config.md index 23c893fd3c..799d25ad1b 100644 --- a/docs/02-Installation/03-Config.md +++ b/docs/02-Installation/03-Config.md @@ -2,25 +2,60 @@ ## Overview -To customize metrics and other options, modify the `retina-config` ConfigMap. Default settings for each component are specified in *deploy/legacy/manifests/controller/helm/retina/values.yaml*. +### Default Configuration -## Agent Config +Default settings for each component are specified in [Values file](../../deploy/legacy/manifests/controller/helm/retina/values.yaml). + +### Deployed Configuration + +Configuration of an active Retina deployment can be seen in `retina-config` and `retina-operator-config` configmaps. + +```shell +kubectl get configmap retina-config -n kube-system -o yaml +kubectl get configmap retina-operator-config -n kube-system -o yaml +``` + +### Updating Configuration + +If the Retina installation was done via Helm, configuration updates should be done via `helm upgrade` defining the specific attribute name and value as part of the command. + +The example below enables gathering of advance pod-level metrics. + +```shell +VERSION=$( curl -sL https://api.github.com/repos/microsoft/retina/releases/latest | jq -r .name) +helm upgrade --install retina oci://ghcr.io/microsoft/retina/charts/retina \ + --version $VERSION \ + --namespace kube-system \ + --set image.tag=$VERSION \ + --set operator.tag=$VERSION \ + --set logLevel=info \ + --set enabledPlugin_linux="\[dropreason\,packetforward\,linuxutil\,dns\]" + --set enablePodLevel=true +``` + +## General Configuration + +Apply to both Agent and Operator. * `enableTelemetry`: Enables telemetry for the agent for managed AKS clusters. Requires `buildinfo.ApplicationInsightsID` to be set if enabled. -* `enablePodLevel`: Enables gathering of advanced pod-level metrics, attaching pods' metadata to Retina's metrics. * `remoteContext`: Enables Retina to watch Pods on the cluster. -* `enableAnnotations`: Enables gathering of metrics for annotated resources. Resources can be annotated with `retina.sh=observe`. Requires the operator and `enableRetinaEndpoint` to be enabled. -* `enabledPlugin`: List of enabled plugins. + +## Agent Configuration + +* `logLevel`: Define the level of logs to store. +* `enabledPlugin_linux`: List of enabled plugins. * `metricsInterval`: Interval for gathering metrics (in seconds). (@deprecated, use `metricsIntervalDuration` instead) * `metricsIntervalDuration`: Interval for gathering metrics (in `time.Duration`). +* `enablePodLevel`: Enables gathering of advanced pod-level metrics, attaching pods' metadata to Retina's metrics. +* `enableConntrackMetrics`: Enables conntrack metrics for packets and bytes forwarded/received. +* `enableAnnotations`: Enables gathering of metrics for annotated resources. Resources can be annotated with `retina.sh=observe`. Requires the operator and `operator.enableRetinaEndpoint` to be enabled. * `bypassLookupIPOfInterest`: If true, plugins like `packetparser` and `dropreason` will bypass IP lookup, generating an event for each packet regardless. `enableAnnotations` will not work if this is true. * `dataAggregationLevel`: Defines the level of data aggregation for Retina. See [Data Aggregation](../05-Concepts/data-aggregation.md) for more details. -## Operator Config +## Operator Configuration -* `installCRDs`: Allows the operator to manage the installation of Retina-related CRDs. -* `enableTelemetry`: Enables telemetry for the operator in managed AKS clusters. Requires `buildinfo.ApplicationInsightsID` to be set if enabled. -* `captureDebug`: Toggles debug mode for captures. If true, the operator uses the image from the test container registry for the capture workload. Refer to *pkg/capture/utils/capture_image.go* for details on how the debug capture image version is selected. -* `captureJobNumLimit`: Sets the maximum number of jobs that can be created for each Capture. -* `enableRetinaEndpoint`: Allows the operator to monitor and update the cache with Pod metadata. -* `enableManagedStorageAccount`: Enables the use of a managed storage account for storing artifacts. +* `operator.installCRDs`: Allows the operator to manage the installation of Retina-related CRDs. +* `operator.enableRetinaEndpoint`: Allows the operator to monitor and update the cache with Pod metadata. +* `capture.captureDebug`: Toggles debug mode for captures. If true, the operator uses the image from the test container registry for the capture workload. Refer to [Capture Image file](../../pkg/capture/utils/capture_image.go) for details on how the debug capture image version is selected. +* `capture.captureJobNumLimit`: Sets the maximum number of jobs that can be created for each Capture. +* `capture.enableManagedStorageAccount`: Enables the use of a managed storage account for storing artifacts. diff --git a/docs/02-Installation/04-prometheus.md b/docs/02-Installation/04-prometheus.md index 781c610289..bebabb90b6 100644 --- a/docs/02-Installation/04-prometheus.md +++ b/docs/02-Installation/04-prometheus.md @@ -6,6 +6,7 @@ Prometheus is an open-source system monitoring and alerting toolkit originally b 1. Create a Kubernetes cluster. 2. Install Retina DaemonSet (see [Quick Installation](./01-Setup.md)). +3. Clone [Retina Repository](https://github.com/microsoft/retina) or download [Prometheus Values File](../../deploy/legacy/prometheus/values.yaml). ## Install Prometheus via Helm @@ -19,13 +20,17 @@ Prometheus is an open-source system monitoring and alerting toolkit originally b 1. Install the Prometheus chart ```shell - helm install prometheus -n kube-system -f deploy/legacy/prometheus/values.yaml prometheus-community/kube-prometheus-stack + # The value of VALUE_FILE_PATH is relative to the repo root folder. Update this according to the location of your file. + VALUE_FILE_PATH=deploy/legacy/prometheus/values.yaml + helm install prometheus -n kube-system -f $VALUE_FILE_PATH prometheus-community/kube-prometheus-stack ``` Or if you already have the chart installed, upgrade how you see fit, providing the new job name as an additional scrape config, ex: ```shell - helm upgrade prometheus -n kube-system -f deploy/legacy/prometheus/values.yaml prometheus-community/kube-prometheus-stack + # The value of VALUE_FILE_PATH is relative to the repo root folder. Update this according to the location of your file. + VALUE_FILE_PATH=deploy/legacy/prometheus/values.yaml + helm upgrade prometheus -n kube-system -f $VALUE_FILE_PATH prometheus-community/kube-prometheus-stack ``` > Note: Grafana and kube-state metrics may schedule on Windows nodes, the current chart doesn't have node affinity for those components. Some manual intervention may be required. diff --git a/go.mod b/go.mod index e23f21e18d..aa54766e00 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v1.3.1 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 // indirect github.com/BurntSushi/toml v1.3.2 // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect @@ -48,18 +48,18 @@ require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.23 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.27 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.24 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.28 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.28 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.27 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.28 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.8 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.8 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.8 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.24.9 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.8 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.33.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.9 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.10 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.8 // indirect github.com/aws/smithy-go v1.22.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect @@ -262,7 +262,7 @@ require ( golang.org/x/sync v0.10.0 golang.org/x/sys v0.29.0 golang.org/x/term v0.28.0 // indirect - google.golang.org/protobuf v1.36.1 + google.golang.org/protobuf v1.36.3 gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/api v0.30.3 @@ -279,7 +279,7 @@ require ( github.com/Azure/azure-container-networking/zapai v0.0.3 github.com/Azure/azure-sdk-for-go v68.0.0+incompatible github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.8.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/dashboard/armdashboard v1.2.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 @@ -289,10 +289,10 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0 github.com/Microsoft/hcsshim v0.12.0-rc.3 github.com/Sytten/logrus-zap-hook v0.1.0 - github.com/aws/aws-sdk-go-v2 v1.32.8 - github.com/aws/aws-sdk-go-v2/config v1.28.11 - github.com/aws/aws-sdk-go-v2/credentials v1.17.52 - github.com/aws/aws-sdk-go-v2/service/s3 v1.72.2 + github.com/aws/aws-sdk-go-v2 v1.33.0 + github.com/aws/aws-sdk-go-v2/config v1.29.0 + github.com/aws/aws-sdk-go-v2/credentials v1.17.53 + github.com/aws/aws-sdk-go-v2/service/s3 v1.73.0 github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 github.com/cilium/cilium v1.16.0-pre.1.0.20240403152809-b9853ecbcaeb github.com/cilium/ebpf v0.16.0 diff --git a/go.sum b/go.sum index 5de7126432..198a78c413 100644 --- a/go.sum +++ b/go.sum @@ -15,10 +15,10 @@ github.com/Azure/azure-sdk-for-go v68.0.0+incompatible h1:fcYLmCpyNYRnvJbPerq7U0 github.com/Azure/azure-sdk-for-go v68.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 h1:B/dfvscEQtew9dVuoxqxrUKKv8Ih2f55PydknDamU+g= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0/go.mod h1:fiPSssYvltE08HJchL04dOy+RD4hgrjph0cwGGMntdI= -github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.0 h1:+m0M/LFxN43KvULkDNfdXOgrjtg6UYJPFBJyuEcRCAw= -github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.0/go.mod h1:PwOyop78lveYMRs6oCxjiVyBdyCgIYH6XHIVZO9/SFQ= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1 h1:1mvYtZfWQAnwNah/C+Z+Jb9rQH95LPE2vlmMuWAHJk8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.1/go.mod h1:75I/mXtme1JyWFtz8GocPHVFyH421IBoZErnO16dd0k= +github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.1 h1:Bk5uOhSAenHyR5P61D/NzeQCv+4fEVV8mOkJ82NqpWw= +github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.1/go.mod h1:QZ4pw3or1WPmRBxf0cHd1tknzrT54WPBOQoGutCPvSU= github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/authorization/armauthorization/v2 v2.2.0 h1:Hp+EScFOu9HeCbeW8WU2yQPJd4gGwhMgKxWe+G6jNzw= @@ -87,8 +87,8 @@ github.com/Azure/perf-tests/network/benchmarks/netperf v0.0.0-20241008140716-395 github.com/Azure/perf-tests/network/benchmarks/netperf v0.0.0-20241008140716-395a79947d2c/go.mod h1:jeV6A8q9uDVDwffTt5KBk+5g7bXfpEImYW6qLKn0E+I= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mod h1:tCcJZ0uHAmvjsVYzEFivsRTN00oz5BEsRgQHu5JZ9WE= -github.com/AzureAD/microsoft-authentication-library-for-go v1.3.1 h1:gUDtaZk8heteyfdmv+pcfHvhR9llnh7c7GMwZ8RVG04= -github.com/AzureAD/microsoft-authentication-library-for-go v1.3.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2 h1:kYRSnvJju5gYVyhkij+RTJ/VR6QIUaCfWeaFm2ycsjQ= +github.com/AzureAD/microsoft-authentication-library-for-go v1.3.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= @@ -129,40 +129,40 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3dyBCFEj5IhUbnKptjxatkF07cF2ak3yi77so= github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= -github.com/aws/aws-sdk-go-v2 v1.32.8 h1:cZV+NUS/eGxKXMtmyhtYPJ7Z4YLoI/V8bkTdRZfYhGo= -github.com/aws/aws-sdk-go-v2 v1.32.8/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2 v1.33.0 h1:Evgm4DI9imD81V0WwD+TN4DCwjUMdc94TrduMLbgZJs= +github.com/aws/aws-sdk-go-v2 v1.33.0/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= -github.com/aws/aws-sdk-go-v2/config v1.28.11 h1:7Ekru0IkRHRnSRWGQLnLN6i0o1Jncd0rHo2T130+tEQ= -github.com/aws/aws-sdk-go-v2/config v1.28.11/go.mod h1:x78TpPvBfHH16hi5tE3OCWQ0pzNfyXA349p5/Wp82Yo= -github.com/aws/aws-sdk-go-v2/credentials v1.17.52 h1:I4ymSk35LHogx2Re2Wu6LOHNTRaRWkLVoJgWS5Wd40M= -github.com/aws/aws-sdk-go-v2/credentials v1.17.52/go.mod h1:vAkqKbMNUcher8fDXP2Ge2qFXKMkcD74qvk1lJRMemM= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.23 h1:IBAoD/1d8A8/1aA8g4MBVtTRHhXRiNAgwdbo/xRM2DI= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.23/go.mod h1:vfENuCM7dofkgKpYzuzf1VT1UKkA/YL3qanfBn7HCaA= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.27 h1:jSJjSBzw8VDIbWv+mmvBSP8ezsztMYJGH+eKqi9AmNs= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.27/go.mod h1:/DAhLbFRgwhmvJdOfSm+WwikZrCuUJiA4WgJG0fTNSw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.27 h1:l+X4K77Dui85pIj5foXDhPlnqcNRG2QUyvca300lXh8= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.27/go.mod h1:KvZXSFEXm6x84yE8qffKvT3x8J5clWnVFXphpohhzJ8= +github.com/aws/aws-sdk-go-v2/config v1.29.0 h1:Vk/u4jof33or1qAQLdofpjKV7mQQT7DcUpnYx8kdmxY= +github.com/aws/aws-sdk-go-v2/config v1.29.0/go.mod h1:iXAZK3Gxvpq3tA+B9WaDYpZis7M8KFgdrDPMmHrgbJM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.53 h1:lwrVhiEDW5yXsuVKlFVUnR2R50zt2DklhOyeLETqDuE= +github.com/aws/aws-sdk-go-v2/credentials v1.17.53/go.mod h1:CkqM1bIw/xjEpBMhBnvqUXYZbpCFuj6dnCAyDk2AtAY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.24 h1:5grmdTdMsovn9kPZPI23Hhvp0ZyNm5cRO+IZFIYiAfw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.24/go.mod h1:zqi7TVKTswH3Ozq28PkmBmgzG1tona7mo9G2IJg4Cis= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.28 h1:igORFSiH3bfq4lxKFkTSYDhJEUCYo6C8VKiWJjYwQuQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.28/go.mod h1:3So8EA/aAYm36L7XIvCVwLa0s5N0P7o2b1oqnx/2R4g= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.28 h1:1mOW9zAUMhTSrMDssEHS/ajx8JcAj/IcftzcmNlmVLI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.28/go.mod h1:kGlXVIWDfvt2Ox5zEaNglmq0hXPHgQFNMix33Tw22jA= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.27 h1:AmB5QxnD+fBFrg9LcqzkgF/CaYvMyU/BTlejG4t1S7Q= -github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.27/go.mod h1:Sai7P3xTiyv9ZUYO3IFxMnmiIP759/67iQbU4kdmkyU= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.28 h1:7kpeALOUeThs2kEjlAxlADAVfxKmkYAedlpZ3kdoSJ4= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.28/go.mod h1:pyaOYEdp1MJWgtXLy6q80r3DhsVdOIOZNB9hdTcJIvI= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.8 h1:iwYS40JnrBeA9e9aI5S6KKN4EB2zR4iUVYN0nwVivz4= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.8/go.mod h1:Fm9Mi+ApqmFiknZtGpohVcBGvpTu542VC4XO9YudRi0= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.8 h1:cWno7lefSH6Pp+mSznagKCgfDGeZRin66UvYUqAkyeA= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.8/go.mod h1:tPD+VjU3ABTBoEJ3nctu5Nyg4P4yjqSH5bJGGkY4+XE= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.8 h1:/Mn7gTedG86nbpjT4QEKsN1D/fThiYe1qvq7WsBGNHg= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.8/go.mod h1:Ae3va9LPmvjj231ukHB6UeT8nS7wTPfC3tMZSZMwNYg= -github.com/aws/aws-sdk-go-v2/service/s3 v1.72.2 h1:a7aQ3RW+ug4IbhoQp29NZdc7vqrzKZZfWZSaQAXOZvQ= -github.com/aws/aws-sdk-go-v2/service/s3 v1.72.2/go.mod h1:xMekrnhmJ5aqmyxtmALs7mlvXw5xRh+eYjOjvrIIFJ4= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.9 h1:YqtxripbjWb2QLyzRK9pByfEDvgg95gpC2AyDq4hFE8= -github.com/aws/aws-sdk-go-v2/service/sso v1.24.9/go.mod h1:lV8iQpg6OLOfBnqbGMBKYjilBlf633qwHnBEiMSPoHY= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.8 h1:6dBT1Lz8fK11m22R+AqfRsFn8320K0T5DTGxxOQBSMw= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.8/go.mod h1:/kiBvRQXBc6xeJTYzhSdGvJ5vm1tjaDEjH+MSeRJnlY= -github.com/aws/aws-sdk-go-v2/service/sts v1.33.7 h1:qwGa9MA8G7mBq2YphHFaygdPe5t9OA7SvaJdwWTlEds= -github.com/aws/aws-sdk-go-v2/service/sts v1.33.7/go.mod h1:+8h7PZb3yY5ftmVLD7ocEoE98hdc8PoKS0H3wfx1dlc= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.0 h1:pC19SLXdHsfXTvCwy3sHfiACXaSjRkKlOQYnaTk8loI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.5.0/go.mod h1:dIW8puxSbYLSPv/ju0d9A3CpwXdtqvJtYKDMVmPLOWE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.9 h1:TQmKDyETFGiXVhZfQ/I0cCFziqqX58pi4tKJGYGFSz0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.9/go.mod h1:HVLPK2iHQBUx7HfZeOQSEu3v2ubZaAY2YPbAm5/WUyY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.9 h1:2aInXbh02XsbO0KobPGMNXyv2QP73VDKsWPNJARj/+4= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.9/go.mod h1:dgXS1i+HgWnYkPXqNoPIPKeUsUUYHaUbThC90aDnNiE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.73.0 h1:sHF4brL/726nbTldh8GGDKFS5LsQ8FwOTKEyvKp9DB4= +github.com/aws/aws-sdk-go-v2/service/s3 v1.73.0/go.mod h1:rGHXqEgGFrz7j58tIGKKAfD1fJzYXeKkN/Jn3eIRZYE= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.10 h1:DyZUj3xSw3FR3TXSwDhPhuZkkT14QHBiacdbUVcD0Dg= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.10/go.mod h1:Ro744S4fKiCCuZECXgOi760TiYylUM8ZBf6OGiZzJtY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.9 h1:I1TsPEs34vbpOnR81GIcAq4/3Ud+jRHVGwx6qLQUHLs= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.9/go.mod h1:Fzsj6lZEb8AkTE5S68OhcbBqeWPsR8RnGuKPr8Todl8= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.8 h1:pqEJQtlKWvnv3B6VRt60ZmsHy3SotlEBvfUBPB1KVcM= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.8/go.mod h1:f6vjfZER1M17Fokn0IzssOTMT2N8ZSq+7jnNF0tArvw= github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -779,8 +779,8 @@ github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= -github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= @@ -1153,8 +1153,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/managers/pluginmanager/pluginmanager.go b/pkg/managers/pluginmanager/pluginmanager.go index 15d133382f..0501ae13fc 100644 --- a/pkg/managers/pluginmanager/pluginmanager.go +++ b/pkg/managers/pluginmanager/pluginmanager.go @@ -39,7 +39,7 @@ type PluginManager struct { plugins map[string]plugin.Plugin tel telemetry.Telemetry - watcherManager watchermanager.IWatcherManager + watcherManager watchermanager.Manager } func NewPluginManager(cfg *kcfg.Config, tel telemetry.Telemetry) (*PluginManager, error) { @@ -126,17 +126,20 @@ func (p *PluginManager) Start(ctx context.Context) error { return ErrZeroInterval } + g, ctx := errgroup.WithContext(ctx) + if p.cfg.EnablePodLevel { - p.l.Info("starting watchers") + g.Go(func() error { + p.l.Info("starting watchers") - // Start watcher manager - if err := p.watcherManager.Start(ctx); err != nil { - return errors.Wrap(err, "failed to start watcher manager") - } + // Start watcher manager + if err := p.watcherManager.Start(ctx); err != nil { + return errors.Wrap(err, "failed to start watcher manager") + } + return nil + }) } - g, ctx := errgroup.WithContext(ctx) - // run conntrack GC ct, err := conntrack.New() if err != nil { diff --git a/pkg/managers/pluginmanager/pluginmanager_test.go b/pkg/managers/pluginmanager/pluginmanager_test.go index 2203468b5b..9221ad39c3 100644 --- a/pkg/managers/pluginmanager/pluginmanager_test.go +++ b/pkg/managers/pluginmanager/pluginmanager_test.go @@ -38,8 +38,8 @@ var ( } ) -func setupWatcherManagerMock(ctl *gomock.Controller) (m *watchermock.MockIWatcherManager) { - m = watchermock.NewMockIWatcherManager(ctl) +func setupWatcherManagerMock(ctl *gomock.Controller) (m *watchermock.MockManager) { + m = watchermock.NewMockManager(ctl) m.EXPECT().Start(gomock.Any()).Return(nil).AnyTimes() m.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() return @@ -456,7 +456,7 @@ func TestWatcherManagerFailure(t *testing.T) { defer ctl.Finish() log.SetupZapLogger(log.GetDefaultLogOpts()) - m := watchermock.NewMockIWatcherManager(ctl) + m := watchermock.NewMockManager(ctl) m.EXPECT().Start(gomock.Any()).Return(errors.New("error")).AnyTimes() cfg := cfgPodLevelEnabled diff --git a/pkg/managers/watchermanager/mocks/mock_types.go b/pkg/managers/watchermanager/mocks/mock_types.go index b848396086..4150e4b7a6 100644 --- a/pkg/managers/watchermanager/mocks/mock_types.go +++ b/pkg/managers/watchermanager/mocks/mock_types.go @@ -16,59 +16,59 @@ import ( gomock "go.uber.org/mock/gomock" ) -// MockIWatcher is a mock of IWatcher interface. -type MockIWatcher struct { +// MockWatcher is a mock of Watcher interface. +type MockWatcher struct { ctrl *gomock.Controller - recorder *MockIWatcherMockRecorder + recorder *MockWatcherMockRecorder } -// MockIWatcherMockRecorder is the mock recorder for MockIWatcher. -type MockIWatcherMockRecorder struct { - mock *MockIWatcher +// MockWatcherMockRecorder is the mock recorder for MockWatcher. +type MockWatcherMockRecorder struct { + mock *MockWatcher } -// NewMockIWatcher creates a new mock instance. -func NewMockIWatcher(ctrl *gomock.Controller) *MockIWatcher { - mock := &MockIWatcher{ctrl: ctrl} - mock.recorder = &MockIWatcherMockRecorder{mock} +// NewMockWatcher creates a new mock instance. +func NewMockWatcher(ctrl *gomock.Controller) *MockWatcher { + mock := &MockWatcher{ctrl: ctrl} + mock.recorder = &MockWatcherMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockIWatcher) EXPECT() *MockIWatcherMockRecorder { +func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder { return m.recorder } -// Init mocks base method. -func (m *MockIWatcher) Init(ctx context.Context) error { +// Name mocks base method. +func (m *MockWatcher) Name() string { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Init", ctx) - ret0, _ := ret[0].(error) + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) return ret0 } -// Init indicates an expected call of Init. -func (mr *MockIWatcherMockRecorder) Init(ctx any) *gomock.Call { +// Name indicates an expected call of Name. +func (mr *MockWatcherMockRecorder) Name() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockIWatcher)(nil).Init), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockWatcher)(nil).Name)) } -// Refresh mocks base method. -func (m *MockIWatcher) Refresh(ctx context.Context) error { +// Start mocks base method. +func (m *MockWatcher) Start(ctx context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Refresh", ctx) + ret := m.ctrl.Call(m, "Start", ctx) ret0, _ := ret[0].(error) return ret0 } -// Refresh indicates an expected call of Refresh. -func (mr *MockIWatcherMockRecorder) Refresh(ctx any) *gomock.Call { +// Start indicates an expected call of Start. +func (mr *MockWatcherMockRecorder) Start(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Refresh", reflect.TypeOf((*MockIWatcher)(nil).Refresh), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockWatcher)(nil).Start), ctx) } // Stop mocks base method. -func (m *MockIWatcher) Stop(ctx context.Context) error { +func (m *MockWatcher) Stop(ctx context.Context) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Stop", ctx) ret0, _ := ret[0].(error) @@ -76,36 +76,36 @@ func (m *MockIWatcher) Stop(ctx context.Context) error { } // Stop indicates an expected call of Stop. -func (mr *MockIWatcherMockRecorder) Stop(ctx any) *gomock.Call { +func (mr *MockWatcherMockRecorder) Stop(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockIWatcher)(nil).Stop), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockWatcher)(nil).Stop), ctx) } -// MockIWatcherManager is a mock of IWatcherManager interface. -type MockIWatcherManager struct { +// MockManager is a mock of Manager interface. +type MockManager struct { ctrl *gomock.Controller - recorder *MockIWatcherManagerMockRecorder + recorder *MockManagerMockRecorder } -// MockIWatcherManagerMockRecorder is the mock recorder for MockIWatcherManager. -type MockIWatcherManagerMockRecorder struct { - mock *MockIWatcherManager +// MockManagerMockRecorder is the mock recorder for MockManager. +type MockManagerMockRecorder struct { + mock *MockManager } -// NewMockIWatcherManager creates a new mock instance. -func NewMockIWatcherManager(ctrl *gomock.Controller) *MockIWatcherManager { - mock := &MockIWatcherManager{ctrl: ctrl} - mock.recorder = &MockIWatcherManagerMockRecorder{mock} +// NewMockManager creates a new mock instance. +func NewMockManager(ctrl *gomock.Controller) *MockManager { + mock := &MockManager{ctrl: ctrl} + mock.recorder = &MockManagerMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockIWatcherManager) EXPECT() *MockIWatcherManagerMockRecorder { +func (m *MockManager) EXPECT() *MockManagerMockRecorder { return m.recorder } // Start mocks base method. -func (m *MockIWatcherManager) Start(ctx context.Context) error { +func (m *MockManager) Start(ctx context.Context) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Start", ctx) ret0, _ := ret[0].(error) @@ -113,13 +113,13 @@ func (m *MockIWatcherManager) Start(ctx context.Context) error { } // Start indicates an expected call of Start. -func (mr *MockIWatcherManagerMockRecorder) Start(ctx any) *gomock.Call { +func (mr *MockManagerMockRecorder) Start(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockIWatcherManager)(nil).Start), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockManager)(nil).Start), ctx) } // Stop mocks base method. -func (m *MockIWatcherManager) Stop(ctx context.Context) error { +func (m *MockManager) Stop(ctx context.Context) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Stop", ctx) ret0, _ := ret[0].(error) @@ -127,7 +127,7 @@ func (m *MockIWatcherManager) Stop(ctx context.Context) error { } // Stop indicates an expected call of Stop. -func (mr *MockIWatcherManagerMockRecorder) Stop(ctx any) *gomock.Call { +func (mr *MockManagerMockRecorder) Stop(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockIWatcherManager)(nil).Stop), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockManager)(nil).Stop), ctx) } diff --git a/pkg/managers/watchermanager/types.go b/pkg/managers/watchermanager/types.go index 2417650c06..170928eea6 100644 --- a/pkg/managers/watchermanager/types.go +++ b/pkg/managers/watchermanager/types.go @@ -5,29 +5,24 @@ package watchermanager import ( "context" - "sync" - "time" "github.com/microsoft/retina/pkg/log" ) //go:generate go run go.uber.org/mock/mockgen@v0.4.0 -source=types.go -destination=mocks/mock_types.go -package=mocks . -type IWatcher interface { - // Init, Stop, and Refresh should only be called by watchermanager. - Init(ctx context.Context) error +type Watcher interface { + // Start and Stop should only be called by watchermanager. + Start(ctx context.Context) error Stop(ctx context.Context) error - Refresh(ctx context.Context) error + Name() string } -type IWatcherManager interface { +type Manager interface { Start(ctx context.Context) error Stop(ctx context.Context) error } type WatcherManager struct { - Watchers []IWatcher - l *log.ZapLogger - refreshRate time.Duration - cancel context.CancelFunc - wg sync.WaitGroup + Watchers []Watcher + l *log.ZapLogger } diff --git a/pkg/managers/watchermanager/watchermanager.go b/pkg/managers/watchermanager/watchermanager.go index 55617db286..390a283854 100644 --- a/pkg/managers/watchermanager/watchermanager.go +++ b/pkg/managers/watchermanager/watchermanager.go @@ -5,13 +5,14 @@ package watchermanager import ( "context" - "fmt" "time" "github.com/microsoft/retina/pkg/log" "github.com/microsoft/retina/pkg/watchers/apiserver" "github.com/microsoft/retina/pkg/watchers/endpoint" + "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -21,61 +22,39 @@ const ( func NewWatcherManager() *WatcherManager { return &WatcherManager{ - Watchers: []IWatcher{ - endpoint.Watcher(), - apiserver.Watcher(), + Watchers: []Watcher{ + apiserver.NewWatcher(), + endpoint.NewWatcher(), }, - l: log.Logger().Named("watcher-manager"), - refreshRate: DefaultRefreshRate, + l: log.Logger().Named("watcher-manager"), } } func (wm *WatcherManager) Start(ctx context.Context) error { - newCtx, cancelCtx := context.WithCancel(ctx) - wm.cancel = cancelCtx - + wm.l.Info("starting watcher manager") + // start all watchers + g, ctx := errgroup.WithContext(ctx) for _, w := range wm.Watchers { - if err := w.Init(ctx); err != nil { - wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err)) - return err - } - wm.wg.Add(1) - go wm.runWatcher(newCtx, w) - wm.l.Info("watcher started", zap.String("watcher_type", fmt.Sprintf("%T", w))) + w := w + g.Go(func() error { + wm.l.Info("starting watcher", zap.String("name", w.Name())) + err := w.Start(ctx) + if err != nil { + wm.l.Error("watcher exited with error", zap.Error(err), zap.String("name", w.Name())) + return errors.Wrap(err, "watcher exited with error") + } + return nil + }) + } + err := g.Wait() + if err != nil { + wm.l.Error("watcher manager exited with error", zap.Error(err)) + return errors.Wrap(err, "watcher manager exited with error") } return nil } func (wm *WatcherManager) Stop(ctx context.Context) error { - if wm.cancel != nil { - wm.cancel() // cancel all runWatcher - } - for _, w := range wm.Watchers { - if err := w.Stop(ctx); err != nil { - wm.l.Error("failed to stop", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err)) - return err - } - } - wm.wg.Wait() // wait for all runWatcher to stop wm.l.Info("watcher manager stopped") return nil } - -func (wm *WatcherManager) runWatcher(ctx context.Context, w IWatcher) error { - defer wm.wg.Done() // signal that this runWatcher is done - ticker := time.NewTicker(wm.refreshRate) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - wm.l.Info("watcher stopping...", zap.String("watcher_type", fmt.Sprintf("%T", w))) - return nil - case <-ticker.C: - err := w.Refresh(ctx) - if err != nil { - wm.l.Error("refresh failed", zap.Error(err)) - return err - } - } - } -} diff --git a/pkg/managers/watchermanager/watchermanager_test.go b/pkg/managers/watchermanager/watchermanager_test.go index 37dcf4809f..a611a29464 100644 --- a/pkg/managers/watchermanager/watchermanager_test.go +++ b/pkg/managers/watchermanager/watchermanager_test.go @@ -4,71 +4,32 @@ package watchermanager import ( "context" - "errors" "testing" "github.com/microsoft/retina/pkg/log" - mock "github.com/microsoft/retina/pkg/managers/watchermanager/mocks" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "golang.org/x/sync/errgroup" ) -var errInitFailed = errors.New("init failed") - func TestStopWatcherManagerGracefully(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() log.SetupZapLogger(log.GetDefaultLogOpts()) mgr := NewWatcherManager() - mockAPIServerWatcher := mock.NewMockIWatcher(ctl) - mockEndpointWatcher := mock.NewMockIWatcher(ctl) - - mgr.Watchers = []IWatcher{ - mockEndpointWatcher, - mockAPIServerWatcher, - } - - mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() - mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() - - mockEndpointWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() - mockAPIServerWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes() - - ctx, _ := context.WithCancel(context.Background()) + ctx := context.Background() g, errctx := errgroup.WithContext(ctx) + var err error g.Go(func() error { - return mgr.Start(errctx) + err = mgr.Start(errctx) + return err }) - err := g.Wait() - mgr.Stop(errctx) require.NoError(t, err) } -func TestWatcherInitFailsGracefully(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - log.SetupZapLogger(log.GetDefaultLogOpts()) - - mockAPIServerWatcher := mock.NewMockIWatcher(ctl) - mockEndpointWatcher := mock.NewMockIWatcher(ctl) - - mgr := NewWatcherManager() - mgr.Watchers = []IWatcher{ - mockAPIServerWatcher, - mockEndpointWatcher, - } - - mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes() - mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes() - - err := mgr.Start(context.Background()) - require.NotNil(t, err, "Expected error when starting watcher manager") -} - func TestWatcherStopWithoutStart(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() diff --git a/pkg/plugin/packetparser/packetparser_linux.go b/pkg/plugin/packetparser/packetparser_linux.go index fb54c8bb5e..37d72bc3c4 100644 --- a/pkg/plugin/packetparser/packetparser_linux.go +++ b/pkg/plugin/packetparser/packetparser_linux.go @@ -382,23 +382,26 @@ func (p *packetParser) endpointWatcherCallbackFn(obj interface{}) { iface := event.Obj.(netlink.LinkAttrs) ifaceKey := ifaceToKey(iface) - lockMapVal, _ := p.interfaceLockMap.LoadOrStore(ifaceKey, &sync.Mutex{}) - mu := lockMapVal.(*sync.Mutex) - mu.Lock() - defer mu.Unlock() + _, ifaceExist := p.interfaceLockMap.LoadOrStore(ifaceKey, struct{}{}) switch event.Type { case endpoint.EndpointCreated: - p.l.Debug("Endpoint created", zap.String("name", iface.Name)) - p.createQdiscAndAttach(iface, Veth) + if !ifaceExist { + p.l.Debug("Endpoint created", zap.String("name", iface.Name)) + p.createQdiscAndAttach(iface, Veth) + } case endpoint.EndpointDeleted: - p.l.Debug("Endpoint deleted", zap.String("name", iface.Name)) - // Clean. - if value, ok := p.tcMap.Load(ifaceKey); ok { - v := value.(*tcValue) - p.clean(v.tc, v.qdisc) - // Delete from map. - p.tcMap.Delete(ifaceKey) + if ifaceExist { + p.l.Debug("Endpoint deleted", zap.String("name", iface.Name)) + // Clean. + if value, ok := p.tcMap.Load(ifaceKey); ok { + v := value.(*tcValue) + p.clean(v.tc, v.qdisc) + // Delete from map. + p.tcMap.Delete(ifaceKey) + } + // Delete from interfaceLockMap. + p.interfaceLockMap.Delete(ifaceKey) } default: // Unknown. diff --git a/pkg/plugin/packetparser/types_linux.go b/pkg/plugin/packetparser/types_linux.go index 4fc06f35c3..69ddfe8d74 100644 --- a/pkg/plugin/packetparser/types_linux.go +++ b/pkg/plugin/packetparser/types_linux.go @@ -114,7 +114,7 @@ type packetParser struct { tcMap *sync.Map reader perfReader enricher enricher.EnricherInterface - // interfaceLockMap is a map of key to *sync.Mutex. + // interfaceLockMap is a map of exisiting interfaces interfaceLockMap *sync.Map endpointIngressInfo *ebpf.ProgramInfo endpointEgressInfo *ebpf.ProgramInfo diff --git a/pkg/watchers/apiserver/apiserver.go b/pkg/watchers/apiserver/apiserver.go index 606c2cde74..52d91d746f 100644 --- a/pkg/watchers/apiserver/apiserver.go +++ b/pkg/watchers/apiserver/apiserver.go @@ -9,208 +9,167 @@ import ( "net" "net/url" "strings" + "time" "github.com/microsoft/retina/pkg/common" cc "github.com/microsoft/retina/pkg/controllers/cache" "github.com/microsoft/retina/pkg/log" fm "github.com/microsoft/retina/pkg/managers/filtermanager" "github.com/microsoft/retina/pkg/pubsub" - "github.com/microsoft/retina/pkg/utils" "go.uber.org/zap" - "k8s.io/client-go/rest" kcfg "sigs.k8s.io/controller-runtime/pkg/client/config" ) -const ( - filterManagerRetries = 3 - hostLookupRetries = 6 // 6 retries for a total of 63 seconds. -) - -type ApiServerWatcher struct { - isRunning bool - l *log.ZapLogger - current cache - new cache - apiServerHostName string - hostResolver IHostResolver - filterManager fm.IFilterManager - restConfig *rest.Config -} - -var a *ApiServerWatcher - -// Watcher creates a new ApiServerWatcher instance. -func Watcher() *ApiServerWatcher { - if a == nil { - a = &ApiServerWatcher{ - isRunning: false, - l: log.Logger().Named("apiserver-watcher"), - current: make(cache), - hostResolver: net.DefaultResolver, - } - } - - return a +func (w *Watcher) Name() string { + return watcherName } -func (a *ApiServerWatcher) Init(ctx context.Context) error { - if a.isRunning { - a.l.Info("apiserver watcher is already running") - return nil - } - - // Get filter manager. - if a.filterManager == nil { - var err error - a.filterManager, err = fm.Init(filterManagerRetries) - if err != nil { - a.l.Error("failed to init filter manager", zap.Error(err)) - return fmt.Errorf("failed to init filter manager: %w", err) +// Start the apiserver watcher. +func (w *Watcher) Start(ctx context.Context) error { + ticker := time.NewTicker(w.refreshRate) + for { + select { + case <-ctx.Done(): + w.l.Info("context done, stopping apiserver watcher") + return nil + case <-ticker.C: + err := w.initNewCache(ctx) + if err != nil { + return err + } + // Compare the new ips with the old ones. + created, deleted := w.diffCache() + + // Publish the new ips. + createdIps := []net.IP{} + deletedIps := []net.IP{} + + for _, v := range created { + w.l.Info("New Apiserver ips:", zap.Any("ip", v)) + ip := net.ParseIP(v.(string)).To4() + createdIps = append(createdIps, ip) + } + + for _, v := range deleted { + w.l.Info("Deleted Apiserver ips:", zap.Any("ip", v)) + ip := net.ParseIP(v.(string)).To4() + deletedIps = append(deletedIps, ip) + } + + if len(createdIps) > 0 { + // Publish the new ips. + w.publish(createdIps, cc.EventTypeAddAPIServerIPs) + // Add ips to filter manager if any. + err := w.filtermanager.AddIPs(createdIps, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) + if err != nil { + w.l.Error("Failed to add ips to filter manager", zap.Error(err)) + } + } + + if len(deletedIps) > 0 { + // Publish the deleted ips. + w.publish(deletedIps, cc.EventTypeDeleteAPIServerIPs) + // Delete ips from filter manager if any. + err := w.filtermanager.DeleteIPs(deletedIps, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) + if err != nil { + w.l.Error("Failed to delete ips from filter manager", zap.Error(err)) + } + } + + // update the current cache and reset the new cache + w.current = w.new.deepcopy() + w.new = nil } } - - // Get kubeconfig. - if a.restConfig == nil { - config, err := kcfg.GetConfig() - if err != nil { - a.l.Error("failed to get kubeconfig", zap.Error(err)) - return fmt.Errorf("failed to get kubeconfig: %w", err) - } - a.restConfig = config - } - - hostName, err := a.getHostName() - if err != nil { - a.l.Error("failed to get host name", zap.Error(err)) - return fmt.Errorf("failed to get host name: %w", err) - } - a.apiServerHostName = hostName - - a.isRunning = true - - return nil } -// Stop stops the ApiServerWatcher. -func (a *ApiServerWatcher) Stop(ctx context.Context) error { - if !a.isRunning { - a.l.Info("apiserver watcher is not running") - return nil - } - a.isRunning = false +// Stop the apiserver watcher. +func (w *Watcher) Stop(_ context.Context) error { + w.l.Info("stopping apiserver watcher") return nil } -func (a *ApiServerWatcher) Refresh(ctx context.Context) error { - err := a.initNewCache(ctx) +func (w *Watcher) initNewCache(ctx context.Context) error { + ips, err := w.getAPIServerIPs(ctx) if err != nil { - a.l.Error("failed to initialize new cache", zap.Error(err)) return err } - // Compare the new IPs with the old ones. - created, deleted := a.diffCache() - - createdIPs := []net.IP{} - deletedIPs := []net.IP{} - - for _, v := range created { - a.l.Info("New Apiserver IPs:", zap.Any("ip", v)) - ip := net.ParseIP(v.(string)).To4() - createdIPs = append(createdIPs, ip) - } - - for _, v := range deleted { - a.l.Info("Deleted Apiserver IPs:", zap.Any("ip", v)) - ip := net.ParseIP(v.(string)).To4() - deletedIPs = append(deletedIPs, ip) + // Reset the new cache. + w.new = make(cache) + for _, ip := range ips { + w.new[ip] = struct{}{} } + return nil +} - if len(createdIPs) > 0 { - a.publish(createdIPs, cc.EventTypeAddAPIServerIPs) - err := a.filterManager.AddIPs(createdIPs, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) - if err != nil { - a.l.Error("Failed to add IPs to filter manager", zap.Error(err)) +func (w *Watcher) diffCache() (created, deleted []interface{}) { + // check if there are new ips + for k := range w.new { + if _, ok := w.current[k]; !ok { + created = append(created, k) } } - if len(deletedIPs) > 0 { - a.publish(deletedIPs, cc.EventTypeDeleteAPIServerIPs) - err := a.filterManager.DeleteIPs(deletedIPs, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"}) - if err != nil { - a.l.Error("Failed to delete IPs from filter manager", zap.Error(err)) + // check if there are deleted ips + for k := range w.current { + if _, ok := w.new[k]; !ok { + deleted = append(deleted, k) } } - - a.current = a.new.deepcopy() - a.new = nil - - return nil + return } -func (a *ApiServerWatcher) initNewCache(ctx context.Context) error { - ips, err := a.resolveIPs(ctx, a.apiServerHostName) +func (w *Watcher) getAPIServerIPs(ctx context.Context) ([]string, error) { + // Parse the URL + host, err := w.retrieveAPIServerHostname() if err != nil { - return fmt.Errorf("failed to resolve IPs: %w", err) + return nil, err } - // Reset new cache. - a.new = make(cache) - for _, ip := range ips { - a.new[ip] = struct{}{} + // Get the ips for the host + ips, err := w.resolveIPs(ctx, host) + if err != nil { + return nil, err } - return nil + + return ips, nil } -func (a *ApiServerWatcher) diffCache() (created, deleted []interface{}) { - // Check if there are any new IPs. - for k := range a.new { - if _, ok := a.current[k]; !ok { - created = append(created, k) - } +// parse url to extract hostname +func (w *Watcher) retrieveAPIServerHostname() (string, error) { + // Parse the URL + parsedURL, err := url.Parse(w.apiServerURL) + if err != nil { + fmt.Println("Failed to parse URL:", err) + return "", err } - // Check if there are any deleted IPs. - for k := range a.current { - if _, ok := a.new[k]; !ok { - deleted = append(deleted, k) - } + // Remove the scheme (http:// or https://) and port from the host + host := strings.TrimPrefix(parsedURL.Host, "www.") + colonIndex := strings.IndexByte(host, ':') + if colonIndex != -1 { + host = host[:colonIndex] } - return + return host, nil } -func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]string, error) { - // perform a DNS lookup for the host URL using the net.DefaultResolver which uses the local resolver. - // Possible errors here are: - // - Canceled context: The context was canceled before the lookup completed. - // -DNS server errors ie NXDOMAIN, SERVFAIL. - // - Network errors ie timeout, unreachable DNS server. - // -Other DNS-related errors encapsulated in a DNSError. - var hostIPs []string - var err error - - retryFunc := func() error { - hostIPs, err = a.hostResolver.LookupHost(ctx, host) - if err != nil { - return fmt.Errorf("APIServer LookupHost failed: %w", err) - } - return nil - } - - // Retry the lookup for hostIPs in case of failure. - err = utils.Retry(retryFunc, hostLookupRetries) +// Resolve the list of ips for the given host +func (w *Watcher) resolveIPs(ctx context.Context, host string) ([]string, error) { + hostIps, err := w.hostResolver.LookupHost(ctx, host) if err != nil { return nil, err } - if len(hostIPs) == 0 { - a.l.Debug("no IPs found for host", zap.String("host", host)) + if len(hostIps) == 0 { + w.l.Error("no ips found for host", zap.String("host", host)) + return nil, fmt.Errorf("no ips found for host %s", host) } - return hostIPs, nil + return hostIps, nil } -func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { +func (w *Watcher) publish(netIPs []net.IP, eventType cc.EventType) { if len(netIPs) == 0 { return } @@ -220,23 +179,30 @@ func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) { ipsToPublish = append(ipsToPublish, ip.String()) } ps := pubsub.New() - ps.Publish(common.PubSubAPIServer, cc.NewCacheEvent(eventType, common.NewAPIServerObject(ipsToPublish))) - a.l.Debug("Published event", zap.Any("eventType", eventType), zap.Any("netIPs", ipsToPublish)) + ps.Publish(common.PubSubAPIServer, + cc.NewCacheEvent( + eventType, + common.NewAPIServerObject(ipsToPublish), + ), + ) + w.l.Debug("Published event", zap.Any("eventType", eventType), zap.Any("netIPs", ipsToPublish)) } -func (a *ApiServerWatcher) getHostName() (string, error) { - // Parse the host URL. - hostURL := a.restConfig.Host - parsedURL, err := url.ParseRequestURI(hostURL) +// getHostURL returns the host url from the config. +func getHostURL() string { + config, err := kcfg.GetConfig() if err != nil { - log.Logger().Error("failed to parse URL", zap.String("url", hostURL), zap.Error(err)) - return "", fmt.Errorf("failed to parse URL: %w", err) + log.Logger().Error("failed to get config", zap.Error(err)) + return "" } + return config.Host +} - // Extract the host name from the URL. - host := strings.TrimPrefix(parsedURL.Host, "www.") - if colonIndex := strings.IndexByte(host, ':'); colonIndex != -1 { - host = host[:colonIndex] +// Get FilterManager +func (w *Watcher) getFilterManager() *fm.FilterManager { + f, err := fm.Init(filterManagerRetries) + if err != nil { + w.l.Error("failed to init filter manager", zap.Error(err)) } - return host, nil + return f } diff --git a/pkg/watchers/apiserver/apiserver_test.go b/pkg/watchers/apiserver/apiserver_test.go index 04105e85c1..903f0034ae 100644 --- a/pkg/watchers/apiserver/apiserver_test.go +++ b/pkg/watchers/apiserver/apiserver_test.go @@ -18,52 +18,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - "k8s.io/client-go/rest" ) -var errDNS = errors.New("DNS error") - -func TestGetWatcher(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - - a := Watcher() - assert.NotNil(t, a) - - a_again := Watcher() - assert.Equal(t, a, a_again, "Expected the same veth watcher instance") -} - -func TestAPIServerWatcherStop(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) - - // When apiserver is already stopped. - a := &ApiServerWatcher{ - isRunning: false, - l: log.Logger().Named("apiserver-watcher"), - filterManager: mockedFilterManager, - restConfig: getMockConfig(true), - } - err := a.Stop(ctx) - assert.NoError(t, err, "Expected no error when stopping a stopped apiserver watcher") - assert.Equal(t, false, a.isRunning, "Expected apiserver watcher to be stopped") - - // Start the watcher. - err = a.Init(ctx) - assert.NoError(t, err, "Expected no error when starting a stopped apiserver watcher") - - // Stop the watcher. - err = a.Stop(ctx) - assert.NoError(t, err, "Expected no error when stopping a running apiserver watcher") - assert.Equal(t, false, a.isRunning, "Expected apiserver watcher to be stopped") -} - -func TestRefresh(t *testing.T) { +func TestStart(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -74,10 +31,12 @@ func TestRefresh(t *testing.T) { mockedResolver := mocks.NewMockIHostResolver(ctrl) mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) - a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), + w := &Watcher{ + l: log.Logger().Named(watcherName), + apiServerURL: "https://kubernetes.default.svc.cluster.local:443", hostResolver: mockedResolver, - filterManager: mockedFilterManager, + filtermanager: mockedFilterManager, + refreshRate: 1 * time.Second, } // Return 2 random IPs for the host everytime LookupHost is called. @@ -88,8 +47,8 @@ func TestRefresh(t *testing.T) { mockedFilterManager.EXPECT().AddIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockedFilterManager.EXPECT().DeleteIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - a.Refresh(ctx) - assert.NoError(t, a.Refresh(context.Background()), "Expected no error when refreshing the cache") + err := w.Start(ctx) // watcher will timeout after 20 seconds + require.NoError(t, err, "Expected no error when refreshing the cache") } func TestDiffCache(t *testing.T) { @@ -107,19 +66,21 @@ func TestDiffCache(t *testing.T) { new["192.168.1.2"] = struct{}{} new["192.168.1.3"] = struct{}{} - a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), + w := &Watcher{ + l: log.Logger().Named(watcherName), + apiServerURL: "https://kubernetes.default.svc.cluster.local:443", hostResolver: mockedResolver, current: old, new: new, + refreshRate: 1 * time.Second, } - created, deleted := a.diffCache() + created, deleted := w.diffCache() assert.Equal(t, 1, len(created), "Expected 1 created host") assert.Equal(t, 1, len(deleted), "Expected 1 deleted host") } -func TestRefreshLookUpAlwaysFail(t *testing.T) { +func TestStartError(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -128,19 +89,23 @@ func TestRefreshLookUpAlwaysFail(t *testing.T) { defer cancel() mockedResolver := mocks.NewMockIHostResolver(ctrl) + mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) - a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), - hostResolver: mockedResolver, + w := &Watcher{ + l: log.Logger().Named(watcherName), + apiServerURL: "https://kubernetes.default.svc.cluster.local:443", + hostResolver: mockedResolver, + filtermanager: mockedFilterManager, + refreshRate: 1 * time.Second, } mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errors.New("Error")).AnyTimes() - a.Refresh(ctx) - require.Error(t, a.Refresh(context.Background()), "Expected error when refreshing the cache") + err := w.Start(ctx) + require.Error(t, err, "Expected error when refreshing the cache") } -func TestInitWithIncorrectURL(t *testing.T) { +func TestResolveIPEmpty(t *testing.T) { log.SetupZapLogger(log.GetDefaultLogOpts()) ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -149,62 +114,20 @@ func TestInitWithIncorrectURL(t *testing.T) { defer cancel() mockedResolver := mocks.NewMockIHostResolver(ctrl) - mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) - a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), - hostResolver: mockedResolver, - restConfig: getMockConfig(false), - filterManager: mockedFilterManager, + w := &Watcher{ + l: log.Logger().Named(watcherName), + apiServerURL: "https://kubernetes.default.svc.cluster.local:443", + hostResolver: mockedResolver, + refreshRate: 1 * time.Second, } mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return([]string{}, nil).AnyTimes() - require.Error(t, a.Init(ctx), "Expected error during init") + + err := w.Start(ctx) + require.Error(t, err, "Expected error when resolving the IP") } func randomIP() string { return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256)) } - -// Mock function to simulate getting a Kubernetes config -func getMockConfig(isCorrect bool) *rest.Config { - if isCorrect { - return &rest.Config{ - Host: "https://kubernetes.default.svc.cluster.local:443", - } - } - return &rest.Config{ - Host: "", - } -} - -func TestRefreshFailsFirstFourAttemptsSucceedsOnFifth(t *testing.T) { - _, err := log.SetupZapLogger(log.GetDefaultLogOpts()) - require.NoError(t, err) - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - mockedResolver := mocks.NewMockIHostResolver(ctrl) - mockedFilterManager := filtermanagermocks.NewMockIFilterManager(ctrl) - - a := &ApiServerWatcher{ - l: log.Logger().Named("apiserver-watcher"), - hostResolver: mockedResolver, - filterManager: mockedFilterManager, - } - - // Simulate LookupHost failing the first four times and succeeding on the fifth. - gomock.InOrder( - mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return(nil, errDNS).Times(4), - mockedResolver.EXPECT().LookupHost(gomock.Any(), gomock.Any()).Return([]string{"127.0.0.1"}, nil).Times(1), - ) - - mockedFilterManager.EXPECT().AddIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockedFilterManager.EXPECT().DeleteIPs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - - err = a.Refresh(ctx) - require.NoError(t, err, "Expected no error when refreshing the cache") -} diff --git a/pkg/watchers/apiserver/types.go b/pkg/watchers/apiserver/types.go index bf8952ab13..af0f7cf01c 100644 --- a/pkg/watchers/apiserver/types.go +++ b/pkg/watchers/apiserver/types.go @@ -3,13 +3,49 @@ package apiserver -import "context" +import ( + "context" + "net" + "time" + + "github.com/microsoft/retina/pkg/log" + fm "github.com/microsoft/retina/pkg/managers/filtermanager" +) //go:generate go run go.uber.org/mock/mockgen@v0.4.0 -source=types.go -destination=mocks/mock_types.go -package=mocks . type IHostResolver interface { LookupHost(context context.Context, host string) ([]string, error) } +const ( + watcherName = "apiserver-watcher" + filterManagerRetries = 3 + defaultRefreshRate = 30 * time.Second +) + +type Watcher struct { + l *log.ZapLogger + current cache + new cache + apiServerURL string + hostResolver IHostResolver + filtermanager fm.IFilterManager + refreshRate time.Duration +} + +// NewWatcher creates a new apiserver watcher. +func NewWatcher() *Watcher { + w := &Watcher{ + l: log.Logger().Named(watcherName), + current: make(cache), + apiServerURL: getHostURL(), + hostResolver: net.DefaultResolver, + refreshRate: defaultRefreshRate, + } + w.filtermanager = w.getFilterManager() + return w +} + // define cache as a set type cache map[string]struct{} diff --git a/pkg/watchers/endpoint/endpoint.go b/pkg/watchers/endpoint/endpoint.go deleted file mode 100644 index 1a90ea451f..0000000000 --- a/pkg/watchers/endpoint/endpoint.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -package endpoint - -import ( - "context" - - "github.com/microsoft/retina/pkg/common" - "github.com/microsoft/retina/pkg/log" - "github.com/microsoft/retina/pkg/pubsub" - "go.uber.org/zap" -) - -type EndpointWatcher struct { - isRunning bool - l *log.ZapLogger - current cache - new cache - p pubsub.PubSubInterface -} - -var e *EndpointWatcher - -// NewEndpointWatcher creates a new endpoint watcher. -func Watcher() *EndpointWatcher { - if e == nil { - e = &EndpointWatcher{ - isRunning: false, - l: log.Logger().Named("endpoint-watcher"), - p: pubsub.New(), - current: make(cache), - } - } - - return e -} - -func (e *EndpointWatcher) Init(ctx context.Context) error { - if e.isRunning { - e.l.Info("endpoint watcher is already running") - return nil - } - e.isRunning = true - return nil -} - -func (e *EndpointWatcher) Stop(ctx context.Context) error { - if !e.isRunning { - e.l.Info("endpoint watcher is not running") - return nil - } - e.isRunning = false - return nil -} - -func (e *EndpointWatcher) Refresh(ctx context.Context) error { - // initNewCache is OS specific. - // Based on GOOS, will be implemented by either endpoint_linux, or - // endpoint_windows. - err := e.initNewCache() - if err != nil { - return err - } - - // Compare the new veths with the old ones. - created, deleted := e.diffCache() - - // Publish the new veths. - for _, v := range created { - e.l.Debug("Endpoint created", zap.Any("veth", v)) - e.p.Publish(common.PubSubEndpoints, NewEndpointEvent(EndpointCreated, v)) - } - - // Publish the deleted veths. - for _, v := range deleted { - e.l.Debug("Endpoint deleted", zap.Any("veth", v)) - e.p.Publish(common.PubSubEndpoints, NewEndpointEvent(EndpointDeleted, v)) - } - - // Update the cache and reset the new cache. - e.current = e.new.deepcopy() - e.new = nil - - return nil -} - -// Function to differentiate between two caches. -func (e *EndpointWatcher) diffCache() (created, deleted []interface{}) { - // Check if there are any new veths. - for k, v := range e.new { - if _, ok := e.current[k]; !ok { - created = append(created, v) - } - } - - // Check if there are any deleted veths. - for k, v := range e.current { - if _, ok := e.new[k]; !ok { - deleted = append(deleted, v) - } - } - return -} diff --git a/pkg/watchers/endpoint/endpoint_linux.go b/pkg/watchers/endpoint/endpoint_linux.go deleted file mode 100644 index 40a601e1f6..0000000000 --- a/pkg/watchers/endpoint/endpoint_linux.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -package endpoint - -import ( - "github.com/vishvananda/netlink" -) - -var showLink = netlink.LinkList - -func (e *EndpointWatcher) initNewCache() error { - veths, err := listVeths() - if err != nil { - return err - } - - // Reset new cache. - e.new = make(cache) - for _, veth := range veths { - k := key{ - name: veth.Attrs().Name, - hardwareAddr: veth.Attrs().HardwareAddr.String(), - netNsID: veth.Attrs().NetNsID, - } - - e.new[k] = *veth.Attrs() - } - - return nil -} - -// Helper functions. - -// Get all the veth interfaces. -// Similar to ip link show type veth -func listVeths() ([]netlink.Link, error) { - links, err := showLink() - if err != nil { - return nil, err - } - - var veths []netlink.Link - for _, link := range links { - // Ref: https://github.com/vishvananda/netlink/blob/ced5aaba43e3f25bb5f04860641d3e3dd04a8544/link.go#L367 - // Unfortunately, there is no type/constant defined for "veth" in the netlink package. - // Version of netlink tested - https://github.com/vishvananda/netlink/tree/v1.2.1-beta.2 - if link.Type() == "veth" { - veths = append(veths, link) - } - } - - return veths, nil -} diff --git a/pkg/watchers/endpoint/endpoint_linux_test.go b/pkg/watchers/endpoint/endpoint_linux_test.go deleted file mode 100644 index a9fbc439e2..0000000000 --- a/pkg/watchers/endpoint/endpoint_linux_test.go +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. -// nolint - -package endpoint - -import ( - "context" - "errors" - "net" - "testing" - - "github.com/microsoft/retina/pkg/log" - "github.com/microsoft/retina/pkg/pubsub" - "github.com/stretchr/testify/assert" - "github.com/vishvananda/netlink" -) - -func TestGetWatcher(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - - v := Watcher() - assert.NotNil(t, v) - - v_again := Watcher() - assert.Equal(t, v, v_again, "Expected the same veth watcher instance") -} - -func TestEndpointWatcherStart(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - c := context.Background() - - // When veth is already running. - v := &EndpointWatcher{ - isRunning: true, - l: log.Logger().Named("veth-watcher"), - } - err := v.Init(c) - assert.NoError(t, err, "Expected no error when starting a running veth watcher") - assert.Equal(t, true, v.isRunning, "Expected veth watcher to be running") - - // When veth is not running. - v.isRunning = false - err = v.Init(c) - assert.NoError(t, err, "Expected no error when starting a stopped veth watcher") - assert.Equal(t, true, v.isRunning, "Expected veth watcher to be running") - - // Stop the watcher. - err = v.Stop(c) - assert.NoError(t, err, "Expected no error when stopping a running veth watcher") - - // Restart the watcher. - err = v.Init(c) - assert.NoError(t, err, "Expected no error when starting a stopped veth watcher") - assert.Equal(t, true, v.isRunning, "Expected veth watcher to be running") - - // Stop the watcher. - err = v.Stop(c) - assert.NoError(t, err, "Expected no error when stopping a running veth watcher") -} - -func TestEndpointWatcherStop(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - c := context.Background() - - // When veth is already stopped. - v := &EndpointWatcher{ - isRunning: false, - l: log.Logger().Named("veth-watcher"), - } - err := v.Stop(c) - assert.NoError(t, err, "Expected no error when stopping a stopped veth watcher") - assert.Equal(t, false, v.isRunning, "Expected veth watcher to be stopped") - - // Start the watcher. - err = v.Init(c) - assert.NoError(t, err, "Expected no error when starting a stopped veth watcher") - - // Stop the watcher. - err = v.Stop(c) - assert.NoError(t, err, "Expected no error when stopping a running veth watcher") - assert.Equal(t, false, v.isRunning, "Expected veth watcher to be stopped") -} - -func TestRun(t *testing.T) { - showLink = func() ([]netlink.Link, error) { - return []netlink.Link{ - &netlink.Veth{ - LinkAttrs: netlink.LinkAttrs{ - Name: "veth0", - }, - }, - &netlink.Vxlan{ - LinkAttrs: netlink.LinkAttrs{ - Name: "eth0", - }, - }, - }, nil - } - - links, err := listVeths() - assert.NoError(t, err, "Expected no error when listing veths") - assert.Equal(t, 1, len(links), "Expected to find 1 veth") - assert.Equal(t, "veth0", links[0].Attrs().Name, "Expected to find veth0") -} - -func TestDiffCache(t *testing.T) { - old := cache{ - key{ - name: "veth0", - hardwareAddr: "00:00:00:00:00:00", - netNsID: 0, - }: netlink.LinkAttrs{ - Name: "veth0", - }, - } - new := cache{ - key{ - name: "veth1", - hardwareAddr: "00:00:00:00:00:FF", - netNsID: 1, - }: netlink.LinkAttrs{ - Name: "veth1", - }, - } - e := &EndpointWatcher{current: old, new: new} - c, d := e.diffCache() - assert.Equal(t, 1, len(c), "Expected to find 1 created veth") - assert.Equal(t, 1, len(d), "Expected to find 1 deleted veth") - assert.Equal(t, "veth1", c[0].(netlink.LinkAttrs).Name, "Expected to find veth1") - assert.Equal(t, "veth0", d[0].(netlink.LinkAttrs).Name, "Expected to find veth0") -} - -func TestRefreshAndCallback(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - c := context.Background() - - showLink = func() ([]netlink.Link, error) { - return []netlink.Link{ - &netlink.Veth{ - LinkAttrs: netlink.LinkAttrs{ - Name: "veth0", - HardwareAddr: func() net.HardwareAddr { - mac, _ := net.ParseMAC("00:00:00:00:00:00") - return mac - }(), - NetNsID: 0, - }, - }, - &netlink.Veth{ - LinkAttrs: netlink.LinkAttrs{ - Name: "veth1", - HardwareAddr: func() net.HardwareAddr { - mac, _ := net.ParseMAC("00:00:00:00:00:01") - return mac - }(), - NetNsID: 1, - }, - }, - }, nil - } - - cache := make(cache) - cache[key{ - name: "veth2", - hardwareAddr: "00:00:00:00:00:02", - netNsID: 2, - }] = &netlink.Veth{ - LinkAttrs: netlink.LinkAttrs{ - Name: "veth2", - }, - } - - v := &EndpointWatcher{ - isRunning: true, - current: cache, - l: log.Logger().Named("veth-watcher"), - p: pubsub.New(), - } - - // When cache is empty. - assert.Equal(t, 1, len(v.current), "Expected to find 0 veths") - - // Post refresh. - err := v.Refresh(c) - assert.NoError(t, err, "Expected no error when refreshing veth cache") - assert.Equal(t, 2, len(v.current), "Expected to find 2 veths") - assert.Equal(t, "veth0", v.current[key{ - name: "veth0", - hardwareAddr: "00:00:00:00:00:00", - netNsID: 0, - }].(netlink.LinkAttrs).Name, "Expected to find veth0") - assert.Equal(t, "veth1", v.current[key{ - name: "veth1", - hardwareAddr: "00:00:00:00:00:01", - netNsID: 1, - }].(netlink.LinkAttrs).Name, "Expected to find veth1") -} - -func TestRefreshError(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - c := context.Background() - - showLink = func() ([]netlink.Link, error) { - return nil, errors.New("error") - } - - v := &EndpointWatcher{ - isRunning: true, - current: make(cache), - l: log.Logger().Named("veth-watcher"), - p: pubsub.New(), - } - - err := v.Refresh(c) - assert.Error(t, err, "Expected an error when refreshing veth cache") -} - -func TestListVethsError(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) - - showLink = func() ([]netlink.Link, error) { - return nil, errors.New("error") - } - - _, err := listVeths() - assert.Error(t, err, "Expected an error when listing veths") -} diff --git a/pkg/watchers/endpoint/endpoint_windows.go b/pkg/watchers/endpoint/endpoint_windows.go deleted file mode 100644 index 0a12ff9694..0000000000 --- a/pkg/watchers/endpoint/endpoint_windows.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -/* Template */ - -package endpoint - -import ( - "github.com/Microsoft/hcsshim/hcn" -) - -func (e *EndpointWatcher) initNewCache() error { - return nil -} - -func listVeths() ([]hcn.HostComputeEndpoint, error) { - return nil, nil -} diff --git a/pkg/watchers/endpoint/types.go b/pkg/watchers/endpoint/types.go index 693a4c79a7..652f1eb192 100644 --- a/pkg/watchers/endpoint/types.go +++ b/pkg/watchers/endpoint/types.go @@ -3,11 +3,32 @@ package endpoint +import ( + "github.com/microsoft/retina/pkg/log" + "github.com/microsoft/retina/pkg/pubsub" +) + const ( + watcherName = "endpoint-watcher" endpointCreated string = "endpoint_created" endpointDeleted string = "endpoint_deleted" ) +type Watcher struct { + l *log.ZapLogger + p pubsub.PubSubInterface +} + +// NewWatcher creates a new endpoint watcher. +func NewWatcher() *Watcher { + w := &Watcher{ + l: log.Logger().Named(watcherName), + p: pubsub.New(), + } + + return w +} + type key struct { name string hardwareAddr string diff --git a/pkg/watchers/endpoint/watcher_linux.go b/pkg/watchers/endpoint/watcher_linux.go new file mode 100644 index 0000000000..381da9ecb6 --- /dev/null +++ b/pkg/watchers/endpoint/watcher_linux.go @@ -0,0 +1,64 @@ +package endpoint + +import ( + "context" + "syscall" + + "github.com/microsoft/retina/pkg/common" + "github.com/pkg/errors" + "github.com/vishvananda/netlink" + "go.uber.org/zap" +) + +func (w *Watcher) Name() string { + return watcherName +} + +func (w *Watcher) Start(ctx context.Context) error { + w.l.Info("endpoint watcher started") + + // Create a channel to receive netlink events. + netlinkEvCh := make(chan netlink.LinkUpdate) + done := make(chan struct{}) + // Options for subscribing to link updates. We want to list existing links. + opt := netlink.LinkSubscribeOptions{ + ListExisting: true, + } + // Subscribe to link updates. + if err := netlink.LinkSubscribeWithOptions(netlinkEvCh, done, opt); err != nil { + return errors.Wrap(err, "failed to subscribe to link updates") + } + defer close(done) + + for { + select { + case <-ctx.Done(): + w.l.Info("stopping endpoint watcher") + return nil + case ev := <-netlinkEvCh: + // Filter for veth devices. + if ev.Link.Type() == "veth" { + veth := ev.Link.(*netlink.Veth) + switch ev.Header.Type { + case syscall.RTM_NEWLINK: + // Check if the veth device is up. + if veth.Attrs().OperState == netlink.OperUp { + w.l.Info("veth device is up", zap.String("veth", veth.Attrs().Name)) + w.p.Publish(common.PubSubEndpoints, NewEndpointEvent(EndpointCreated, *veth.Attrs())) + } + case syscall.RTM_DELLINK: + // Check if the veth device is down. + if veth.Attrs().OperState == netlink.OperDown { + w.l.Info("veth device is down", zap.String("veth", veth.Attrs().Name)) + w.p.Publish(common.PubSubEndpoints, NewEndpointEvent(EndpointDeleted, *veth.Attrs())) + } + } + } + } + } +} + +func (w *Watcher) Stop(_ context.Context) error { + w.l.Info("stopping veth watcher") + return nil +} diff --git a/pkg/watchers/endpoint/watcher_windows.go b/pkg/watchers/endpoint/watcher_windows.go new file mode 100644 index 0000000000..2116ed52a4 --- /dev/null +++ b/pkg/watchers/endpoint/watcher_windows.go @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/* Template */ + +package endpoint + +import ( + "context" +) + +func (w *Watcher) Name() string { + return watcherName +} + +func (w *Watcher) Start(_ context.Context) error { + // Not implemented for Windows + return nil +} + +func (w *Watcher) Stop(_ context.Context) error { + // Not implemented for Windows + return nil +} diff --git a/test/managers/filtermanager/main.go b/test/managers/filtermanager/main.go index 7de2a9fd89..8e01b5fd7c 100644 --- a/test/managers/filtermanager/main.go +++ b/test/managers/filtermanager/main.go @@ -19,7 +19,9 @@ import ( "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/watchers/apiserver" "github.com/microsoft/retina/pkg/watchers/endpoint" + "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func main() { @@ -34,17 +36,23 @@ func main() { metrics.InitializeMetrics() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // watcher manager wm := watchermanager.NewWatcherManager() - wm.Watchers = []watchermanager.IWatcher{endpoint.Watcher(), apiserver.Watcher()} + wm.Watchers = []watchermanager.Watcher{endpoint.NewWatcher(), apiserver.NewWatcher()} - err := wm.Start(ctx) - if err != nil { - l.Error("Failed to start endpoint watcher", zap.Error(err)) - panic(err) - } + g, ctx := errgroup.WithContext(ctx) + // Start watcher manager + g.Go(func() error { + err := wm.Start(ctx) + if err != nil { + l.Error("watcher manager exited with error", zap.Error(err)) + return errors.Wrap(err, "watcher manager exited with error") + } + return nil + }) defer func() { if err := wm.Stop(ctx); err != nil { l.Error("Failed to stop endpoint watcher", zap.Error(err)) diff --git a/test/plugin/packetparser/main_linux.go b/test/plugin/packetparser/main_linux.go index 2596dd1fe7..1515344875 100644 --- a/test/plugin/packetparser/main_linux.go +++ b/test/plugin/packetparser/main_linux.go @@ -33,7 +33,7 @@ func main() { // watcher manager wm := watchermanager.NewWatcherManager() - wm.Watchers = []watchermanager.IWatcher{endpoint.Watcher()} + wm.Watchers = []watchermanager.Watcher{endpoint.NewWatcher()} err := wm.Start(ctxTimeout) if err != nil { diff --git a/test/watchers/apiserver/main.go b/test/watchers/apiserver/main.go index 4703e12922..fc0c7801be 100644 --- a/test/watchers/apiserver/main.go +++ b/test/watchers/apiserver/main.go @@ -36,7 +36,7 @@ func main() { }() // watcher manager wm := watchermanager.NewWatcherManager() - wm.Watchers = []watchermanager.IWatcher{apiserver.Watcher()} + wm.Watchers = []watchermanager.Watcher{apiserver.NewWatcher()} // apiserver watcher. err = wm.Start(ctx) diff --git a/test/watchers/veth/main.go b/test/watchers/veth/main.go index 50a00c1c01..8a5a5cae38 100644 --- a/test/watchers/veth/main.go +++ b/test/watchers/veth/main.go @@ -24,7 +24,7 @@ func main() { // watcher manager wm := watchermanager.NewWatcherManager() - wm.Watchers = []watchermanager.IWatcher{endpoint.Watcher()} + wm.Watchers = []watchermanager.Watcher{endpoint.NewWatcher()} err := wm.Start(ctx) if err != nil {