Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: make retina run in standalone mode #1256

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 98 additions & 65 deletions cmd/legacy/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package legacy

import (
"context"
"fmt"
"os"
"strings"
Expand All @@ -28,6 +29,8 @@
"github.com/go-logr/zapr"
retinav1alpha1 "github.com/microsoft/retina/crd/api/v1alpha1"
"github.com/microsoft/retina/internal/buildinfo"
"github.com/microsoft/retina/pkg/bpf"

Check failure on line 32 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

could not import github.com/microsoft/retina/pkg/bpf (-: build constraints exclude all Go files in pkg/bpf) (typecheck)

Check failure on line 32 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

could not import github.com/microsoft/retina/pkg/bpf (-: build constraints exclude all Go files in pkg/bpf) (typecheck)
"github.com/microsoft/retina/pkg/ciliumfs"

Check failure on line 33 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

could not import github.com/microsoft/retina/pkg/ciliumfs (-: build constraints exclude all Go files in pkg/ciliumfs) (typecheck)

Check failure on line 33 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

could not import github.com/microsoft/retina/pkg/ciliumfs (-: build constraints exclude all Go files in pkg/ciliumfs) (typecheck)
"github.com/microsoft/retina/pkg/config"
controllercache "github.com/microsoft/retina/pkg/controllers/cache"
mcc "github.com/microsoft/retina/pkg/controllers/daemon/metricsconfiguration"
Expand All @@ -36,6 +39,7 @@
pc "github.com/microsoft/retina/pkg/controllers/daemon/pod"
kec "github.com/microsoft/retina/pkg/controllers/daemon/retinaendpoint"
sc "github.com/microsoft/retina/pkg/controllers/daemon/service"
"github.com/pkg/errors"

"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
Expand Down Expand Up @@ -90,7 +94,7 @@

daemonConfig, err := config.GetConfig(d.configFile)
if err != nil {
panic(err)
println("failed to get config: ", err)
}

fmt.Println("init client-go")
Expand All @@ -104,10 +108,15 @@
} else {
cfg, err = kcfg.GetConfig()
if err != nil {
panic(err)
println("failed to get config: ")
}
}

if cfg == nil {
cfg = &rest.Config{
Host: daemonConfig.APIServer.Host,
}
}
fmt.Println("api server: ", cfg.Host)

fmt.Println("init logger")
Expand Down Expand Up @@ -176,7 +185,7 @@
}

// Local context has its meaning only when pod level(advanced) metrics is enabled.
if daemonConfig.EnablePodLevel && !daemonConfig.RemoteContext {
if !daemonConfig.Standalone && daemonConfig.EnablePodLevel && !daemonConfig.RemoteContext {
mainLogger.Info("Remote context is disabled, only pods deployed on the same node as retina-agent will be monitored")
// the new cache sets Selector options on the Manager cache which are used
// to perform *server-side* filtering of the cached objects. This is very important
Expand Down Expand Up @@ -208,35 +217,54 @@
}
}

mgr, err := crmgr.New(cfg, mgrOption)
if err != nil {
mainLogger.Error("Unable to start manager", zap.Error(err))
return fmt.Errorf("creating controller-runtime manager: %w", err)
}
var ctx context.Context
var mgr crmgr.Manager
var cl *kubernetes.Clientset

//+kubebuilder:scaffold:builder
if !daemonConfig.Standalone {
mgr, err := crmgr.New(cfg, mgrOption)

Check failure on line 225 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

shadow: declaration of "mgr" shadows declaration at line 221 (govet)

Check failure on line 225 in cmd/legacy/daemon.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

shadow: declaration of "mgr" shadows declaration at line 221 (govet)
if err != nil {
mainLogger.Error("Unable to start manager", zap.Error(err))
return fmt.Errorf("creating controller-runtime manager: %w", err)
}

if healthCheckErr := mgr.AddHealthzCheck("healthz", healthz.Ping); healthCheckErr != nil {
mainLogger.Fatal("Unable to set up health check", zap.Error(healthCheckErr))
}
if addReadyCheckErr := mgr.AddReadyzCheck("readyz", healthz.Ping); addReadyCheckErr != nil {
mainLogger.Fatal("Unable to set up ready check", zap.Error(addReadyCheckErr))
}
//+kubebuilder:scaffold:builder

// k8s Client used for informers
cl := kubernetes.NewForConfigOrDie(mgr.GetConfig())
if healthCheckErr := mgr.AddHealthzCheck("healthz", healthz.Ping); healthCheckErr != nil {
mainLogger.Fatal("Unable to set up health check", zap.Error(healthCheckErr))
}
if addReadyCheckErr := mgr.AddReadyzCheck("readyz", healthz.Ping); addReadyCheckErr != nil {
mainLogger.Fatal("Unable to set up ready check", zap.Error(addReadyCheckErr))
}

serverVersion, err := cl.Discovery().ServerVersion()
if err != nil {
mainLogger.Error("failed to get Kubernetes server version: ", zap.Error(err))
// k8s Client used for informers
cl = kubernetes.NewForConfigOrDie(mgr.GetConfig())

serverVersion, err := cl.Discovery().ServerVersion()
if err != nil {
mainLogger.Error("failed to get Kubernetes server version: ", zap.Error(err))
} else {
mainLogger.Infof("Kubernetes server version: %v", serverVersion)
}

// Setup RetinaEndpoint controller.
// TODO(mainred): This is to temporarily create a cache and pubsub for RetinaEndpoint, need to refactor this.
ctx = ctrl.SetupSignalHandler()
ctrl.SetLogger(zapr.NewLogger(zl.Logger.Named("controller-runtime")))
} else {
mainLogger.Infof("Kubernetes server version: %v", serverVersion)
}
ctx = context.WithoutCancel(context.Background())
// Setup BPF
err = bpf.Setup(mainLogger.Desugar())
if err != nil {
return errors.Wrap(err, "failed to setup Retina bpf filesystem")
}

// Setup RetinaEndpoint controller.
// TODO(mainred): This is to temporarily create a cache and pubsub for RetinaEndpoint, need to refactor this.
ctx := ctrl.SetupSignalHandler()
ctrl.SetLogger(zapr.NewLogger(zl.Logger.Named("controller-runtime")))
// Setup CiliumFS.
err = ciliumfs.Setup(mainLogger.Desugar())
if err != nil {
return errors.Wrap(err, "failed to setup CiliumFS")
}
}

if daemonConfig.EnablePodLevel {
pubSub := pubsub.New()
Expand All @@ -251,52 +279,55 @@
enrich.Run()
metricsModule := mm.InitModule(ctx, daemonConfig, pubSub, enrich, fm, controllerCache)

if !daemonConfig.RemoteContext {
mainLogger.Info("Initializing Pod controller")
if !daemonConfig.Standalone {

podController := pc.New(mgr.GetClient(), controllerCache)
if err := podController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create PodController", zap.Error(err))
}
} else if daemonConfig.EnableRetinaEndpoint {
mainLogger.Info("RetinaEndpoint is enabled")
mainLogger.Info("Initializing RetinaEndpoint controller")
if !daemonConfig.RemoteContext {
mainLogger.Info("Initializing Pod controller")

retinaEndpointController := kec.New(mgr.GetClient(), controllerCache)
if err := retinaEndpointController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create retinaEndpointController", zap.Error(err))
}
}
podController := pc.New(mgr.GetClient(), controllerCache)
if err := podController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create PodController", zap.Error(err))
}
} else if daemonConfig.EnableRetinaEndpoint {
mainLogger.Info("RetinaEndpoint is enabled")
mainLogger.Info("Initializing RetinaEndpoint controller")

mainLogger.Info("Initializing Node controller")
nodeController := nc.New(mgr.GetClient(), controllerCache)
if err := nodeController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create nodeController", zap.Error(err))
}
retinaEndpointController := kec.New(mgr.GetClient(), controllerCache)
if err := retinaEndpointController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create retinaEndpointController", zap.Error(err))
}
}

mainLogger.Info("Initializing Service controller")
svcController := sc.New(mgr.GetClient(), controllerCache)
if err := svcController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create svcController", zap.Error(err))
}
mainLogger.Info("Initializing Node controller")
nodeController := nc.New(mgr.GetClient(), controllerCache)
if err := nodeController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create nodeController", zap.Error(err))
}

if daemonConfig.EnableAnnotations {
mainLogger.Info("Initializing MetricsConfig namespaceController")
namespaceController := namespacecontroller.New(mgr.GetClient(), controllerCache, metricsModule)
if err := namespaceController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create namespaceController", zap.Error(err))
mainLogger.Info("Initializing Service controller")
svcController := sc.New(mgr.GetClient(), controllerCache)
if err := svcController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create svcController", zap.Error(err))
}
go namespaceController.Start(ctx)
} else {
mainLogger.Info("Initializing MetricsConfig controller")
metricsConfigController := mcc.New(mgr.GetClient(), mgr.GetScheme(), metricsModule)
if err := metricsConfigController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create metricsConfigController", zap.Error(err))

if daemonConfig.EnableAnnotations {
mainLogger.Info("Initializing MetricsConfig namespaceController")
namespaceController := namespacecontroller.New(mgr.GetClient(), controllerCache, metricsModule)
if err := namespaceController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create namespaceController", zap.Error(err))
}
go namespaceController.Start(ctx)
} else {
mainLogger.Info("Initializing MetricsConfig controller")
metricsConfigController := mcc.New(mgr.GetClient(), mgr.GetScheme(), metricsModule)
if err := metricsConfigController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create metricsConfigController", zap.Error(err))
}
}
}
}

controllerMgr, err := cm.NewControllerManager(daemonConfig, cl, tel)
controllerMgr, err := cm.NewControllerManager(daemonConfig, daemonConfig.Standalone, cl, tel)
if err != nil {
mainLogger.Fatal("Failed to create controller manager", zap.Error(err))
}
Expand All @@ -315,9 +346,11 @@
go controllerMgr.Start(ctx)
mainLogger.Info("Started controller manager")

// Start all registered controllers. This will block until container receives SIGTERM.
if err := mgr.Start(ctx); err != nil {
mainLogger.Fatal("unable to start manager", zap.Error(err))
if !daemonConfig.Standalone {
// Start all registered controllers. This will block until container receives SIGTERM.
if err := mgr.Start(ctx); err != nil {
mainLogger.Fatal("unable to start manager", zap.Error(err))
}
}

mainLogger.Info("Network observability exiting. Till next time!")
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Config struct {
BypassLookupIPOfInterest bool `yaml:"bypassLookupIPOfInterest"`
DataAggregationLevel Level `yaml:"dataAggregationLevel"`
MonitorSockPath string `yaml:"monitorSockPath"`
Standalone bool `yaml:"standalone"`
}

func GetConfig(cfgFilename string) (*Config, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/managers/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type Controller struct {
enricher *enricher.Enricher
}

func NewControllerManager(conf *kcfg.Config, kubeclient kubernetes.Interface, tel telemetry.Telemetry) (*Controller, error) {
func NewControllerManager(conf *kcfg.Config, standalone bool, kubeclient kubernetes.Interface, tel telemetry.Telemetry) (*Controller, error) {
cmLogger := log.Logger().Named("controller-manager")

if conf.EnablePodLevel {
if conf.EnablePodLevel && !standalone {
// informer factory for pods/services
factory := informers.NewSharedInformerFactory(kubeclient, ResyncTime)
factory.WaitForCacheSync(wait.NeverStop)
Expand Down
8 changes: 4 additions & 4 deletions pkg/managers/controllermanager/controllermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestNewControllerManager(t *testing.T) {

log.SetupZapLogger(log.GetDefaultLogOpts())
kubeclient := k8sfake.NewSimpleClientset()
cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry())
cm, err := NewControllerManager(c, false, kubeclient, telemetry.NewNoopTelemetry())
assert.NoError(t, err, "Expected no error, instead got %+v", err)
assert.NotNil(t, cm)
}
Expand All @@ -45,7 +45,7 @@ func TestNewControllerManagerWin(t *testing.T) {

log.SetupZapLogger(log.GetDefaultLogOpts())
kubeclient := k8sfake.NewSimpleClientset()
cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry())
cm, err := NewControllerManager(c, false, kubeclient, telemetry.NewNoopTelemetry())
assert.Error(t, err, "Expected error of not recognising windows plugins in linux, instead got no error")
assert.Nil(t, cm)
}
Expand All @@ -57,7 +57,7 @@ func TestNewControllerManagerInit(t *testing.T) {

log.SetupZapLogger(log.GetDefaultLogOpts())
kubeclient := k8sfake.NewSimpleClientset()
cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry())
cm, err := NewControllerManager(c, false, kubeclient, telemetry.NewNoopTelemetry())
assert.NoError(t, err, "Expected no error, instead got %+v", err)
assert.NotNil(t, cm)

Expand All @@ -72,7 +72,7 @@ func TestControllerPluginManagerStartFail(t *testing.T) {

log.SetupZapLogger(log.GetDefaultLogOpts())
kubeclient := k8sfake.NewSimpleClientset()
cm, err := NewControllerManager(c, kubeclient, telemetry.NewNoopTelemetry())
cm, err := NewControllerManager(c, false, kubeclient, telemetry.NewNoopTelemetry())
assert.NoError(t, err, "Expected no error, instead got %+v", err)
assert.NotNil(t, cm)

Expand Down
2 changes: 2 additions & 0 deletions pkg/plugin/conntrack/_cprog/conntrack.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct packet
__u8 proto;
__u8 flags; // For TCP packets, this is the TCP flags. For UDP packets, this is will always be 1 for conntrack purposes.
bool is_reply;
__u32 pid;
char comm[TASK_COMM_LEN];
struct conntrackmetadata conntrack_metadata;
};

Expand Down
12 changes: 12 additions & 0 deletions pkg/plugin/conntrack/_cprog/conntrack.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,15 @@
#define OBSERVATION_POINT_TO_ENDPOINT 0x01
#define OBSERVATION_POINT_FROM_NETWORK 0x02
#define OBSERVATION_POINT_TO_NETWORK 0x03

/*
* below enum is pulled from sched.h
* https://elixir.bootlin.com/linux/v6.12/source/include/linux/sched.h#L306
*/
/*
* Define the task command name length as enum, then it can be visible to
* BPF programs.
*/
enum {
TASK_COMM_LEN = 16,
};
Binary file modified pkg/plugin/conntrack/conntrack_bpfel_x86.o
Binary file not shown.
Binary file modified pkg/plugin/dropreason/kprobe_bpfel_x86.o
Binary file not shown.
Binary file modified pkg/plugin/filter/filter_bpfel_x86.o
Binary file not shown.
Loading
Loading