Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ contentmanager: use namespace-scoped informers #1880

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 68 additions & 44 deletions internal/operator-controller/contentmanager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -41,27 +43,29 @@ type CloserSyncingSource interface {
}

type sourcerer interface {
// Source returns a CloserSyncingSource for the provided
// GroupVersionKind. If the CloserSyncingSource encounters an
// Source returns a CloserSyncingSource for the provided namespace
// and GroupVersionKind. If the CloserSyncingSource encounters an
// error after having initially synced, it should requeue the
// provided client.Object and call the provided callback function
Source(schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
Source(string, schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
}

type cache struct {
sources map[schema.GroupVersionKind]CloserSyncingSource
sources map[sourceKey]CloserSyncingSource
sourcerer sourcerer
owner client.Object
syncTimeout time.Duration
mu sync.Mutex
restMapper meta.RESTMapper
}

func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration) Cache {
func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration, rm meta.RESTMapper) Cache {
return &cache{
sources: make(map[schema.GroupVersionKind]CloserSyncingSource),
sources: make(map[sourceKey]CloserSyncingSource),
sourcerer: sourcerer,
owner: owner,
syncTimeout: syncTimeout,
restMapper: rm,
}
}

Expand All @@ -70,15 +74,15 @@ var _ Cache = (*cache)(nil)
func (c *cache) Watch(ctx context.Context, watcher Watcher, objs ...client.Object) error {
c.mu.Lock()
defer c.mu.Unlock()
gvkSet, err := gvksForObjects(objs...)
sourceKeySet, err := c.sourceKeysForObjects(objs...)
if err != nil {
return fmt.Errorf("getting set of GVKs for managed objects: %w", err)
}

if err := c.removeStaleSources(gvkSet); err != nil {
if err := c.removeStaleSources(sourceKeySet); err != nil {
return fmt.Errorf("removing stale sources: %w", err)
}
return c.startNewSources(ctx, gvkSet, watcher)
return c.startNewSources(ctx, sourceKeySet, watcher)
}

func (c *cache) Close() error {
Expand All @@ -99,29 +103,35 @@ func (c *cache) Close() error {
return errors.Join(errs...)
}

func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupVersionKind], watcher Watcher) error {
cacheGvks := c.getCacheGVKs()
gvksToCreate := gvks.Difference(cacheGvks)
type sourceKey struct {
namespace string
gvk schema.GroupVersionKind
}

func (c *cache) startNewSources(ctx context.Context, sources sets.Set[sourceKey], watcher Watcher) error {
type startResult struct {
source CloserSyncingSource
gvk schema.GroupVersionKind
key sourceKey
err error
}
startResults := make(chan startResult)
wg := sync.WaitGroup{}
for _, gvk := range gvksToCreate.UnsortedList() {

existingSourceKeys := c.getCacheKeys()
sourcesToCreate := sources.Difference(existingSourceKeys)
for _, srcKey := range sourcesToCreate.UnsortedList() {
wg.Add(1)
go func() {
defer wg.Done()
source, err := c.startNewSource(ctx, gvk, watcher)
source, err := c.startNewSource(ctx, srcKey, watcher)
startResults <- startResult{
source: source,
gvk: gvk,
key: srcKey,
err: err,
}
}()
}

go func() {
wg.Wait()
close(startResults)
Expand All @@ -134,7 +144,7 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
continue
}

err := c.addSource(result.gvk, result.source)
err := c.addSource(result.key, result.source)
if err != nil {
// If we made it here then there is a logic error in
// calculating the diffs between what is currently being
Expand All @@ -146,20 +156,19 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
slices.SortFunc(sourcesErrors, func(a, b error) int {
return strings.Compare(a.Error(), b.Error())
})

return errors.Join(sourcesErrors...)
}

func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, watcher Watcher) (CloserSyncingSource, error) {
s, err := c.sourcerer.Source(gvk, c.owner, func(ctx context.Context) {
func (c *cache) startNewSource(ctx context.Context, srcKey sourceKey, watcher Watcher) (CloserSyncingSource, error) {
s, err := c.sourcerer.Source(srcKey.namespace, srcKey.gvk, c.owner, func(ctx context.Context) {
// this callback function ensures that we remove the source from the
// cache if it encounters an error after it initially synced successfully
c.mu.Lock()
defer c.mu.Unlock()
err := c.removeSource(gvk)
err := c.removeSource(srcKey)
if err != nil {
logr := log.FromContext(ctx)
logr.Error(err, "managed content cache postSyncError removing source failed", "gvk", gvk)
logr.Error(err, "managed content cache postSyncError removing source failed", "namespace", srcKey.namespace, "gvk", srcKey.gvk)
}
})
if err != nil {
Expand All @@ -168,7 +177,7 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,

err = watcher.Watch(s)
if err != nil {
return nil, fmt.Errorf("establishing watch for GVK %q: %w", gvk, err)
return nil, fmt.Errorf("establishing watch for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
}

syncCtx, syncCancel := context.WithTimeout(ctx, c.syncTimeout)
Expand All @@ -181,19 +190,19 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,
return s, nil
}

func (c *cache) addSource(gvk schema.GroupVersionKind, source CloserSyncingSource) error {
if _, ok := c.sources[gvk]; !ok {
c.sources[gvk] = source
func (c *cache) addSource(key sourceKey, source CloserSyncingSource) error {
if _, ok := c.sources[key]; !ok {
c.sources[key] = source
return nil
}
return errors.New("source already exists")
}

func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error {
cacheGvks := c.getCacheGVKs()
func (c *cache) removeStaleSources(srcKeys sets.Set[sourceKey]) error {
existingSrcKeys := c.getCacheKeys()
removeErrs := []error{}
gvksToRemove := cacheGvks.Difference(gvks)
for _, gvk := range gvksToRemove.UnsortedList() {
srcKeysToRemove := existingSrcKeys.Difference(srcKeys)
for _, gvk := range srcKeysToRemove.UnsortedList() {
err := c.removeSource(gvk)
if err != nil {
removeErrs = append(removeErrs, err)
Expand All @@ -207,23 +216,23 @@ func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error
return errors.Join(removeErrs...)
}

func (c *cache) removeSource(gvk schema.GroupVersionKind) error {
if source, ok := c.sources[gvk]; ok {
err := source.Close()
func (c *cache) removeSource(srcKey sourceKey) error {
if src, ok := c.sources[srcKey]; ok {
err := src.Close()
if err != nil {
return fmt.Errorf("closing source for GVK %q: %w", gvk, err)
return fmt.Errorf("closing source for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
}
}
delete(c.sources, gvk)
delete(c.sources, srcKey)
return nil
}

func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
cacheGvks := sets.New[schema.GroupVersionKind]()
for gvk := range c.sources {
cacheGvks.Insert(gvk)
func (c *cache) getCacheKeys() sets.Set[sourceKey] {
sourceKeys := sets.New[sourceKey]()
for key := range c.sources {
sourceKeys.Insert(key)
}
return cacheGvks
return sourceKeys
}

// gvksForObjects builds a sets.Set of GroupVersionKinds for
Expand All @@ -233,8 +242,8 @@ func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
//
// An empty Group is assumed to be the "core" Kubernetes
// API group.
func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], error) {
gvkSet := sets.New[schema.GroupVersionKind]()
func (c *cache) sourceKeysForObjects(objs ...client.Object) (sets.Set[sourceKey], error) {
sourceKeys := sets.New[sourceKey]()
for _, obj := range objs {
gvk := obj.GetObjectKind().GroupVersionKind()

Expand All @@ -257,8 +266,23 @@ func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], e
)
}

gvkSet.Insert(gvk)
// We shouldn't blindly accept the namespace value provided by the object.
// If the object is cluster-scoped, but includes a namespace for some reason,
// we need to make sure to create the source key with namespace set to
// corev1.NamespaceAll to ensure that the informer we start actually ends up
// watch the cluster-scoped object with a cluster-scoped informer.
mapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, fmt.Errorf("adding %q with GVK %q to set; rest mapping failed: %w", obj.GetName(), gvk, err)
}

ns := corev1.NamespaceAll
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
ns = obj.GetNamespace()
}

sourceKeys.Insert(sourceKey{ns, gvk})
}

return gvkSet, nil
return sourceKeys, nil
}
Loading
Loading