Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to do smarter handling of leader election #109

Merged
merged 6 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/cmd/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ func run(ctx context.Context, cfg *Config, log logrus.FieldLogger) error {
if err != nil {
return fmt.Errorf("creating k8s clients: %w", err)
}
for a, b := range cfg.ReplaceEnvironmentNames {
if v, ok := k8sClients[a]; ok {
k8sClients[b] = v
delete(k8sClients, a)
}
}

log.WithField("envs", len(k8sClients)).Info("Start event watcher")
eventWatcher := event.NewWatcher(pool, k8sClients, log)
go eventWatcher.Run(ctx)
}
Expand Down
88 changes: 79 additions & 9 deletions internal/kubernetes/event/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
eventsql "github.com/nais/api/internal/kubernetes/event/searchsql"
"github.com/nais/api/internal/leaderelection"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"github.com/sourcegraph/conc/pool"
eventv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -26,40 +26,105 @@ type Watcher struct {
clients map[string]kubernetes.Interface
events chan eventsql.UpsertParams
log logrus.FieldLogger
wg *pool.ContextPool

// State returns true when the watcher should be started/continue running and false when it should stop.
state []chan bool
}

func NewWatcher(pool *pgxpool.Pool, clients map[string]kubernetes.Interface, log logrus.FieldLogger) *Watcher {
chs := make([]chan bool, 0, len(clients))
for range clients {
chs = append(chs, make(chan bool, 1))
}
return &Watcher{
clients: clients,
events: make(chan eventsql.UpsertParams, 20),
queries: eventsql.New(pool),
log: log,
state: chs,
}
}

func (w *Watcher) Run(ctx context.Context) {
wg, ctx := errgroup.WithContext(ctx)
for env, client := range w.clients {
wg.Go(func() error {
return w.run(ctx, env, client)
})
w.wg = pool.New().WithErrors().WithContext(ctx)

leaderelection.RegisterOnStartedLeading(w.onStartedLeading)
leaderelection.RegisterOnStoppedLeading(w.onStoppedLeading)
if leaderelection.IsLeader() {
w.onStartedLeading(ctx)
}

wg.Go(func() error {
w.wg.Go(func(ctx context.Context) error {
return w.batchInsert(ctx)
})

if err := wg.Wait(); err != nil {
i := 0
for env, client := range w.clients {
ch := w.state[i]
i++
w.wg.Go(func(ctx context.Context) error {
return w.run(ctx, env, client, ch)
})
}

if err := w.wg.Wait(); err != nil {
w.log.WithError(err).Error("error running events watcher")
}
}

func (w *Watcher) onStoppedLeading() {
for _, ch := range w.state {
select {
case ch <- false:
default:
w.log.WithField("state", "stopped").Error("failed to send state")
}
}
}

func (w *Watcher) onStartedLeading(_ context.Context) {
for _, ch := range w.state {
select {
case ch <- true:
default:
w.log.WithField("state", "started").Error("failed to send state")
}
}
}

var regHorizontalPodAutoscaler = regexp.MustCompile(`New size: (\d+); reason: (\w+).*(below|above) target`)

func (w *Watcher) run(ctx context.Context, env string, client kubernetes.Interface) error {
func (w *Watcher) run(ctx context.Context, env string, client kubernetes.Interface, state chan bool) error {
for {
select {
case <-ctx.Done():
return nil
case s := <-state:
w.log.WithField("env", env).WithField("state", s).Info("state change")
if s {
if err := w.watch(ctx, env, client, state); err != nil {
w.log.WithError(err).Error("failed to watch events")
}
w.log.WithField("env", env).Info("stopped watching")

}
}
}
}

func (w *Watcher) watch(ctx context.Context, env string, client kubernetes.Interface, state chan bool) error {
// Events we want to watch for
// SuccessfulRescale - Check for successful rescale events
// Killing - Check for liveness failures

list, err := client.EventsV1().Events("").List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list events: %w", err)
}

w.log.WithField("len", len(list.Items)).Debug("listed events")

rescale, err := client.EventsV1().Events("").Watch(ctx, metav1.ListOptions{
FieldSelector: "reason=SuccessfulRescale,metadata.namespace!=nais-system",
})
Expand Down Expand Up @@ -99,10 +164,15 @@ func (w *Watcher) run(ctx context.Context, env string, client kubernetes.Interfa
w.events <- e
}

w.log.WithField("env", env).Debug("watching events")
for {
select {
case <-ctx.Done():
return nil
case s := <-state:
if !s {
return nil
}
case event := <-rescale.ResultChan():
handleEvent(event, func(e *eventv1.Event) (eventsql.UpsertParams, bool) {
if !strings.HasPrefix(e.Note, "New size") {
Expand Down
2 changes: 2 additions & 0 deletions internal/kubernetes/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (w *Watcher[T]) remove(cluster string, obj T) {
"cluster": cluster,
"name": obj.GetName(),
"namespace": obj.GetNamespace(),
"gvr": w.watchedType,
}).Debug("Removing object")
}

Expand All @@ -134,6 +135,7 @@ func (w *Watcher[T]) update(cluster string, obj T) {
"cluster": cluster,
"name": obj.GetName(),
"namespace": obj.GetNamespace(),
"gvr": w.watchedType,
}).Debug("Updating object")
}

Expand Down
21 changes: 21 additions & 0 deletions internal/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ import (

var elector *leaderelection.LeaderElector

var callbacks = struct {
onStartedLeading []func(context.Context)
onStoppedLeading []func()
}{}

func RegisterOnStartedLeading(f func(context.Context)) {
callbacks.onStartedLeading = append(callbacks.onStartedLeading, f)
}

func RegisterOnStoppedLeading(f func()) {
callbacks.onStoppedLeading = append(callbacks.onStoppedLeading, f)
}

func Start(ctx context.Context, client kubernetes.Interface, leaseName, namespace string, log logrus.FieldLogger) error {
id, err := os.Hostname()
if err != nil {
Expand All @@ -40,9 +53,17 @@ func Start(ctx context.Context, client kubernetes.Interface, leaseName, namespac
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(context.Context) {
log.Info("Started leading")

for _, f := range callbacks.onStartedLeading {
f(ctx)
}
},
OnStoppedLeading: func() {
log.Info("Stopped leading")

for _, f := range callbacks.onStoppedLeading {
f()
}
},
OnNewLeader: func(identity string) {
log.Infof("New leader: %s", identity)
Expand Down