Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: veth watcher #1237

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pkg/managers/pluginmanager/pluginmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type PluginManager struct {
plugins map[string]plugin.Plugin
tel telemetry.Telemetry

watcherManager watchermanager.IWatcherManager
watcherManager watchermanager.Manager
}

func NewPluginManager(cfg *kcfg.Config, tel telemetry.Telemetry) (*PluginManager, error) {
Expand Down Expand Up @@ -126,17 +126,19 @@ func (p *PluginManager) Start(ctx context.Context) error {
return ErrZeroInterval
}

if p.cfg.EnablePodLevel {
p.l.Info("starting watchers")
g, ctx := errgroup.WithContext(ctx)

// Start watcher manager
if err := p.watcherManager.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start watcher manager")
}
if p.cfg.EnablePodLevel {
g.Go(func() error {
p.l.Info("starting watchers")
// Start watcher manager
if err = p.watcherManager.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start watcher manager")
}
return nil
})
}

g, ctx := errgroup.WithContext(ctx)

// run conntrack GC
ct, err := conntrack.New()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/managers/pluginmanager/pluginmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var (
}
)

func setupWatcherManagerMock(ctl *gomock.Controller) (m *watchermock.MockIWatcherManager) {
m = watchermock.NewMockIWatcherManager(ctl)
func setupWatcherManagerMock(ctl *gomock.Controller) (m *watchermock.MockManager) {
m = watchermock.NewMockManager(ctl)
m.EXPECT().Start(gomock.Any()).Return(nil).AnyTimes()
m.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes()
return
Expand Down Expand Up @@ -456,7 +456,7 @@ func TestWatcherManagerFailure(t *testing.T) {
defer ctl.Finish()
log.SetupZapLogger(log.GetDefaultLogOpts())

m := watchermock.NewMockIWatcherManager(ctl)
m := watchermock.NewMockManager(ctl)
m.EXPECT().Start(gomock.Any()).Return(errors.New("error")).AnyTimes()

cfg := cfgPodLevelEnabled
Expand Down
88 changes: 44 additions & 44 deletions pkg/managers/watchermanager/mocks/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions pkg/managers/watchermanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,24 @@ package watchermanager

import (
"context"
"sync"
"time"

"github.com/microsoft/retina/pkg/log"
)

//go:generate go run go.uber.org/mock/[email protected] -source=types.go -destination=mocks/mock_types.go -package=mocks .
type IWatcher interface {
// Init, Stop, and Refresh should only be called by watchermanager.
Init(ctx context.Context) error
type Watcher interface {
// Start and Stop should only be called by watchermanager.
Start(ctx context.Context) error
Stop(ctx context.Context) error
Refresh(ctx context.Context) error
Name() string
}

type IWatcherManager interface {
type Manager interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
}

type WatcherManager struct {
Watchers []IWatcher
l *log.ZapLogger
refreshRate time.Duration
cancel context.CancelFunc
wg sync.WaitGroup
Watchers []Watcher
l *log.ZapLogger
}
69 changes: 24 additions & 45 deletions pkg/managers/watchermanager/watchermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ package watchermanager

import (
"context"
"fmt"
"time"

"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/watchers/apiserver"
"github.com/microsoft/retina/pkg/watchers/endpoint"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -21,61 +22,39 @@ const (

func NewWatcherManager() *WatcherManager {
return &WatcherManager{
Watchers: []IWatcher{
endpoint.Watcher(),
apiserver.Watcher(),
Watchers: []Watcher{
apiserver.NewWatcher(),
endpoint.NewWatcher(),
},
l: log.Logger().Named("watcher-manager"),
refreshRate: DefaultRefreshRate,
l: log.Logger().Named("watcher-manager"),
}
}

func (wm *WatcherManager) Start(ctx context.Context) error {
newCtx, cancelCtx := context.WithCancel(ctx)
wm.cancel = cancelCtx

wm.l.Info("starting watcher manager")
// start all watchers
g, ctx := errgroup.WithContext(ctx)
for _, w := range wm.Watchers {
if err := w.Init(ctx); err != nil {
wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err))
return err
}
wm.wg.Add(1)
go wm.runWatcher(newCtx, w)
wm.l.Info("watcher started", zap.String("watcher_type", fmt.Sprintf("%T", w)))
w := w
g.Go(func() error {
wm.l.Info("starting watcher", zap.String("name", w.Name()))
err := w.Start(ctx)
if err != nil {
wm.l.Error("watcher exited with error", zap.Error(err), zap.String("name", w.Name()))
return errors.Wrap(err, "watcher exited with error")
}
return nil
})
}
err := g.Wait()
if err != nil {
wm.l.Error("watcher manager exited with error", zap.Error(err))
return errors.Wrap(err, "watcher manager exited with error")
}
return nil
}

func (wm *WatcherManager) Stop(ctx context.Context) error {
if wm.cancel != nil {
wm.cancel() // cancel all runWatcher
}
for _, w := range wm.Watchers {
if err := w.Stop(ctx); err != nil {
wm.l.Error("failed to stop", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err))
return err
}
}
wm.wg.Wait() // wait for all runWatcher to stop
wm.l.Info("watcher manager stopped")
return nil
}

func (wm *WatcherManager) runWatcher(ctx context.Context, w IWatcher) error {
defer wm.wg.Done() // signal that this runWatcher is done
ticker := time.NewTicker(wm.refreshRate)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
wm.l.Info("watcher stopping...", zap.String("watcher_type", fmt.Sprintf("%T", w)))
return nil
case <-ticker.C:
err := w.Refresh(ctx)
if err != nil {
wm.l.Error("refresh failed", zap.Error(err))
return err
}
}
}
}
Loading
Loading