Skip to content

Commit

Permalink
feat: veth watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
nddq committed Jan 16, 2025
1 parent 2bdd493 commit 9fc3202
Show file tree
Hide file tree
Showing 20 changed files with 409 additions and 836 deletions.
2 changes: 1 addition & 1 deletion pkg/managers/pluginmanager/pluginmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type PluginManager struct {
plugins map[string]plugin.Plugin
tel telemetry.Telemetry

watcherManager watchermanager.IWatcherManager
watcherManager watchermanager.Manager
}

func NewPluginManager(cfg *kcfg.Config, tel telemetry.Telemetry) (*PluginManager, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/managers/pluginmanager/pluginmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ var (
}
)

func setupWatcherManagerMock(ctl *gomock.Controller) (m *watchermock.MockIWatcherManager) {
m = watchermock.NewMockIWatcherManager(ctl)
func setupWatcherManagerMock(ctl *gomock.Controller) (m *watchermock.MockManager) {
m = watchermock.NewMockManager(ctl)
m.EXPECT().Start(gomock.Any()).Return(nil).AnyTimes()
m.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes()
return
Expand Down Expand Up @@ -456,7 +456,7 @@ func TestWatcherManagerFailure(t *testing.T) {
defer ctl.Finish()
log.SetupZapLogger(log.GetDefaultLogOpts())

m := watchermock.NewMockIWatcherManager(ctl)
m := watchermock.NewMockManager(ctl)
m.EXPECT().Start(gomock.Any()).Return(errors.New("error")).AnyTimes()

cfg := cfgPodLevelEnabled
Expand Down
88 changes: 44 additions & 44 deletions pkg/managers/watchermanager/mocks/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 7 additions & 12 deletions pkg/managers/watchermanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,24 @@ package watchermanager

import (
"context"
"sync"
"time"

"github.com/microsoft/retina/pkg/log"
)

//go:generate go run go.uber.org/mock/[email protected] -source=types.go -destination=mocks/mock_types.go -package=mocks .
type IWatcher interface {
// Init, Stop, and Refresh should only be called by watchermanager.
Init(ctx context.Context) error
type Watcher interface {
// Start and Stop should only be called by watchermanager.
Start(ctx context.Context) error
Stop(ctx context.Context) error
Refresh(ctx context.Context) error
Name() string
}

type IWatcherManager interface {
type Manager interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
}

type WatcherManager struct {
Watchers []IWatcher
l *log.ZapLogger
refreshRate time.Duration
cancel context.CancelFunc
wg sync.WaitGroup
Watchers []Watcher
l *log.ZapLogger
}
69 changes: 24 additions & 45 deletions pkg/managers/watchermanager/watchermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ package watchermanager

import (
"context"
"fmt"
"time"

"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/watchers/apiserver"
"github.com/microsoft/retina/pkg/watchers/endpoint"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand All @@ -21,61 +22,39 @@ const (

func NewWatcherManager() *WatcherManager {
return &WatcherManager{
Watchers: []IWatcher{
endpoint.Watcher(),
apiserver.Watcher(),
Watchers: []Watcher{
apiserver.NewWatcher(),
endpoint.NewWatcher(),
},
l: log.Logger().Named("watcher-manager"),
refreshRate: DefaultRefreshRate,
l: log.Logger().Named("watcher-manager"),
}
}

func (wm *WatcherManager) Start(ctx context.Context) error {
newCtx, cancelCtx := context.WithCancel(ctx)
wm.cancel = cancelCtx

wm.l.Info("starting watcher manager")
// start all watchers
g, ctx := errgroup.WithContext(ctx)
for _, w := range wm.Watchers {
if err := w.Init(ctx); err != nil {
wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err))
return err
}
wm.wg.Add(1)
go wm.runWatcher(newCtx, w)
wm.l.Info("watcher started", zap.String("watcher_type", fmt.Sprintf("%T", w)))
w := w
g.Go(func() error {
wm.l.Info("starting watcher", zap.String("name", w.Name()))
err := w.Start(ctx)
if err != nil {
wm.l.Error("watcher exited with error", zap.Error(err), zap.String("name", w.Name()))
return errors.Wrap(err, "watcher exited with error")
}
return nil
})
}
err := g.Wait()
if err != nil {
wm.l.Error("watcher manager exited with error", zap.Error(err))
return errors.Wrap(err, "watcher manager exited with error")
}
return nil
}

func (wm *WatcherManager) Stop(ctx context.Context) error {
if wm.cancel != nil {
wm.cancel() // cancel all runWatcher
}
for _, w := range wm.Watchers {
if err := w.Stop(ctx); err != nil {
wm.l.Error("failed to stop", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err))
return err
}
}
wm.wg.Wait() // wait for all runWatcher to stop
wm.l.Info("watcher manager stopped")
return nil
}

func (wm *WatcherManager) runWatcher(ctx context.Context, w IWatcher) error {
defer wm.wg.Done() // signal that this runWatcher is done
ticker := time.NewTicker(wm.refreshRate)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
wm.l.Info("watcher stopping...", zap.String("watcher_type", fmt.Sprintf("%T", w)))
return nil
case <-ticker.C:
err := w.Refresh(ctx)
if err != nil {
wm.l.Error("refresh failed", zap.Error(err))
return err
}
}
}
}
Loading

0 comments on commit 9fc3202

Please sign in to comment.