Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enablement of Retina on ACI #1258

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 67 additions & 48 deletions cmd/legacy/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,16 @@
}

daemonConfig, err := config.GetConfig(d.configFile)
fmt.Printf("config %+v\n", daemonConfig)

if err != nil {
panic(err)
}

fmt.Println("init client-go")
var cfg *rest.Config
if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
var kubeconfig = os.Getenv("KUBECONFIG")
if kubeconfig != "" {
fmt.Println("KUBECONFIG set, using kubeconfig: ", kubeconfig)
cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
Expand All @@ -104,7 +107,8 @@
} else {
cfg, err = kcfg.GetConfig()
if err != nil {
panic(err)
fmt.Println("KUBECONFIG not set. Falling back to standalone mode")
cfg = loadStandaloneConfig()
}
}

Expand Down Expand Up @@ -208,6 +212,11 @@
}
}

// Setup RetinaEndpoint controller.
// TODO(mainred): This is to temporarily create a cache and pubsub for RetinaEndpoint, need to refactor this.
ctx := ctrl.SetupSignalHandler()
ctrl.SetLogger(zapr.NewLogger(zl.Logger.Named("controller-runtime")))

mgr, err := crmgr.New(cfg, mgrOption)
if err != nil {
mainLogger.Error("Unable to start manager", zap.Error(err))
Expand All @@ -226,18 +235,15 @@
// k8s Client used for informers
cl := kubernetes.NewForConfigOrDie(mgr.GetConfig())

serverVersion, err := cl.Discovery().ServerVersion()
if err != nil {
mainLogger.Error("failed to get Kubernetes server version: ", zap.Error(err))
} else {
mainLogger.Infof("Kubernetes server version: %v", serverVersion)
if kubeconfig != "" {
serverVersion, err := cl.Discovery().ServerVersion()

Check failure on line 239 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

shadow: declaration of "err" shadows declaration at line 91 (govet)

Check failure on line 239 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

shadow: declaration of "err" shadows declaration at line 91 (govet)

Check failure on line 239 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

shadow: declaration of "err" shadows declaration at line 91 (govet)

Check failure on line 239 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

shadow: declaration of "err" shadows declaration at line 91 (govet)
if err != nil {
mainLogger.Error("failed to get Kubernetes server version: ", zap.Error(err))
} else {
mainLogger.Infof("Kubernetes server version: %v", serverVersion)
}
}

// Setup RetinaEndpoint controller.
// TODO(mainred): This is to temporarily create a cache and pubsub for RetinaEndpoint, need to refactor this.
ctx := ctrl.SetupSignalHandler()
ctrl.SetLogger(zapr.NewLogger(zl.Logger.Named("controller-runtime")))

if daemonConfig.EnablePodLevel {
pubSub := pubsub.New()
controllerCache := controllercache.New(pubSub)
Expand All @@ -251,47 +257,49 @@
enrich.Run()
metricsModule := mm.InitModule(ctx, daemonConfig, pubSub, enrich, fm, controllerCache)

if !daemonConfig.RemoteContext {
mainLogger.Info("Initializing Pod controller")

podController := pc.New(mgr.GetClient(), controllerCache)
if err := podController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create PodController", zap.Error(err))
if cfg != nil {
if !daemonConfig.RemoteContext {
mainLogger.Info("Initializing Pod controller")

podController := pc.New(mgr.GetClient(), controllerCache)
if err := podController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create PodController", zap.Error(err))
}
} else if daemonConfig.EnableRetinaEndpoint {
mainLogger.Info("RetinaEndpoint is enabled")
mainLogger.Info("Initializing RetinaEndpoint controller")

retinaEndpointController := kec.New(mgr.GetClient(), controllerCache)
if err := retinaEndpointController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create retinaEndpointController", zap.Error(err))
}
}
} else if daemonConfig.EnableRetinaEndpoint {
mainLogger.Info("RetinaEndpoint is enabled")
mainLogger.Info("Initializing RetinaEndpoint controller")

retinaEndpointController := kec.New(mgr.GetClient(), controllerCache)
if err := retinaEndpointController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create retinaEndpointController", zap.Error(err))
mainLogger.Info("Initializing Node controller")
nodeController := nc.New(mgr.GetClient(), controllerCache)
if err := nodeController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create nodeController", zap.Error(err))
}
}

mainLogger.Info("Initializing Node controller")
nodeController := nc.New(mgr.GetClient(), controllerCache)
if err := nodeController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create nodeController", zap.Error(err))
}

mainLogger.Info("Initializing Service controller")
svcController := sc.New(mgr.GetClient(), controllerCache)
if err := svcController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create svcController", zap.Error(err))
}

if daemonConfig.EnableAnnotations {
mainLogger.Info("Initializing MetricsConfig namespaceController")
namespaceController := namespacecontroller.New(mgr.GetClient(), controllerCache, metricsModule)
if err := namespaceController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create namespaceController", zap.Error(err))
mainLogger.Info("Initializing Service controller")
svcController := sc.New(mgr.GetClient(), controllerCache)
if err := svcController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create svcController", zap.Error(err))
}
go namespaceController.Start(ctx)
} else {
mainLogger.Info("Initializing MetricsConfig controller")
metricsConfigController := mcc.New(mgr.GetClient(), mgr.GetScheme(), metricsModule)
if err := metricsConfigController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create metricsConfigController", zap.Error(err))

if daemonConfig.EnableAnnotations {
mainLogger.Info("Initializing MetricsConfig namespaceController")
namespaceController := namespacecontroller.New(mgr.GetClient(), controllerCache, metricsModule)
if err := namespaceController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create namespaceController", zap.Error(err))
}
go namespaceController.Start(ctx)
} else {
mainLogger.Info("Initializing MetricsConfig controller")
metricsConfigController := mcc.New(mgr.GetClient(), mgr.GetScheme(), metricsModule)
if err := metricsConfigController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create metricsConfigController", zap.Error(err))
}
}
}
}
Expand Down Expand Up @@ -323,3 +331,14 @@
mainLogger.Info("Network observability exiting. Till next time!")
return nil
}

func loadStandaloneConfig() *rest.Config {
return &rest.Config{
Host: "http://localhost:8080",
APIPath: "/api",
ContentConfig: rest.ContentConfig{
GroupVersion: nil,
NegotiatedSerializer: nil,
},
}
}
21 changes: 21 additions & 0 deletions pkg/enricher/enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/microsoft/retina/pkg/common"
"github.com/microsoft/retina/pkg/controllers/cache"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/utils"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -186,6 +187,26 @@ func (e *Enricher) Write(ev *v1.Event) {
e.inputRing.Write(ev)
}

func (e *Enricher) GetWindowLabels(ip string) *utils.LabelsInfo {
obj := e.cache.GetObjByIP(ip)
if obj == nil {
e.l.Debug("No object found for IP", zap.String("ip", ip))
return nil
}

switch o := obj.(type) {
case *common.RetinaEndpoint:
return &utils.LabelsInfo{
Namespace: o.Namespace(),
PodName: o.Name(),
}

default:
e.l.Debug("received unknown type from cache", zap.Any("obj", obj), zap.Any("type", reflect.TypeOf(obj)))
return nil
}
}

func (e *Enricher) ExportReader() *container.RingReader {
return container.NewRingReader(e.outputRing, e.outputRing.OldestWrite())
}
5 changes: 5 additions & 0 deletions pkg/hubble/common/mocks/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/managers/pluginmanager/pluginmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func NewPluginManager(cfg *kcfg.Config, tel telemetry.Telemetry) (*PluginManager
}

for _, name := range cfg.EnabledPlugin {
fmt.Printf("Enabled plugins : %+v\n", name)

newPluginFn, ok := plugin.Get(name)
if !ok {
return nil, fmt.Errorf("plugin %s not found in registry", name)
Expand Down
Binary file modified pkg/plugin/conntrack/conntrack_bpfel_x86.o
Binary file not shown.
Binary file modified pkg/plugin/dropreason/kprobe_bpfel_x86.o
Binary file not shown.
Binary file modified pkg/plugin/filter/filter_bpfel_x86.o
Binary file not shown.
117 changes: 117 additions & 0 deletions pkg/plugin/hnsstats/hnsstats_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,27 @@
import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/Microsoft/hcsshim"
"github.com/Microsoft/hcsshim/hcn"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
kcfg "github.com/microsoft/retina/pkg/config"
"github.com/microsoft/retina/pkg/exporter"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/metrics"
"github.com/microsoft/retina/pkg/plugin/registry"
"github.com/microsoft/retina/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

var (
AdvWindowsGauge *prometheus.GaugeVec
)

const (
initialize = iota + 1
start
Expand Down Expand Up @@ -85,6 +93,11 @@
}
h.endpointQuery.Filter = string(filter)

// if h.cfg.EnablePodLevel {
h.l.Info("Creating advanced HNS stats metrics")
initializeAdvMetrics()
// }

h.l.Info("Exiting hnsstats Init...")
return nil
}
Expand Down Expand Up @@ -144,6 +157,7 @@
if vfpcounters, err := parseVfpPortCounters(countersRaw); err == nil {
// Attach VFP port counters
hnsStatsData.vfpCounters = vfpcounters
hnsStatsData.Port = portguid
h.l.Debug("Attached VFP port counters", zap.String(zapPortField, portguid))
// h.l.Info(vfpcounters.String())
} else {
Expand All @@ -154,6 +168,7 @@
}

notifyHnsStats(h, hnsStatsData)
getAdvancedMetricLabels(h, hnsStatsData)
}
}
}
Expand Down Expand Up @@ -208,12 +223,114 @@
metrics.TCPFlagGauge.WithLabelValues(egressLabel, utils.RST).Set(float64(stats.vfpCounters.Out.TcpCounters.PacketCounters.RstPacketCount))
}

func getAdvancedMetricLabels(h *hnsstats, stats *HnsStatsData) {
if AdvWindowsGauge == nil {
h.l.Warn("Advanced windows metric is not initialized")
return
}
// if port is populated, vfp data exists
// labels := enricher.Instance().GetWindowLabels(stats.IPAddress)
h.l.Info("New standalone feature")

labels, err := getStandaloneLabels(stats.IPAddress)
h.l.Info("Reading fields from CNI state file", zap.String(Ip, stats.IPAddress), zap.String(PodName, labels.PodName), zap.String(Namespace, labels.Namespace))

if err != nil {
AdvWindowsGauge.WithLabelValues(PacketsReceived, stats.IPAddress, stats.Port, labels.Namespace, labels.PodName).Set(float64(stats.hnscounters.PacketsReceived))
AdvWindowsGauge.WithLabelValues(PacketsSent, stats.IPAddress, stats.Port, labels.Namespace, labels.PodName).Set(float64(stats.hnscounters.PacketsSent))
h.l.Info("updating advanced HNS stats metric", zap.String(PodName, labels.PodName), zap.String(Namespace, labels.Namespace))
}
}

func (h *hnsstats) Start(ctx context.Context) error {
h.l.Info("Start hnsstats plugin...")
h.state = start
return pullHnsStats(ctx, h)
}

func cleanAdvMetrics() {
exporter.UnregisterMetric(exporter.AdvancedRegistry, metrics.ToPrometheusType(AdvWindowsGauge))
}

func initializeAdvMetrics() {
if AdvWindowsGauge != nil {
cleanAdvMetrics()
}
AdvWindowsGauge = exporter.CreatePrometheusGaugeVecForMetric(
exporter.AdvancedRegistry,
AdvHNSStatsName,
AdvHNSStatsDescription,
utils.Direction,
Ip,
Port,
Namespace,
PodName,
)
}

type Endpoint struct {
ID string `json:"Id"`
IPAddresses []IPInfo `json:"IPAddresses"`
PODName string `json:"PODName"`
PODNameSpace string `json:"PODNameSpace"`
}

type IPInfo struct {
IP string `json:"IP"`
}

type Network struct {
ExternalInterfaces map[string]ExternalInterface `json:"ExternalInterfaces"`
}

type ExternalInterface struct {
Networks map[string]NetworkInfo `json:"Networks"`
}

type NetworkInfo struct {
Endpoints map[string]Endpoint `json:"Endpoints"`
}

func getStandaloneLabels(ip string) (*utils.LabelsInfo, error) {
file, err := os.Open("C:/k/azure-vnet.json")
if err != nil {
return &utils.LabelsInfo{
Namespace: "",
PodName: "",
}, fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()

var network Network
if err := json.NewDecoder(file).Decode(&network); err != nil {
return &utils.LabelsInfo{
Namespace: "",
PodName: "",
}, fmt.Errorf("failed to decode JSON: %w", err)
}

// Search for the IP in Endpoints
for _, iface := range network.ExternalInterfaces {
for _, network := range iface.Networks {
for _, endpoint := range network.Endpoints {
for _, ipAddr := range endpoint.IPAddresses {
if ipAddr.IP == ip {
return &utils.LabelsInfo{
Namespace: endpoint.PODNameSpace,
PodName: endpoint.PODName,
}, nil
}
}
}
}
}

return &utils.LabelsInfo{
Namespace: "",
PodName: "",
}, fmt.Errorf("IP address not found")

Check failure on line 331 in pkg/plugin/hnsstats/hnsstats_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"IP address not found\")" (err113)

Check failure on line 331 in pkg/plugin/hnsstats/hnsstats_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"IP address not found\")" (err113)
}

func (d *hnsstats) Stop() error {
d.l.Info("Entered hnsstats Stop...")
if d.state != start {
Expand Down
Loading
Loading