Skip to content

Commit 6558075

Browse files
committed
contentmanager: use namespace-scoped informers
Signed-off-by: Joe Lanford <[email protected]>
1 parent a2ae8b8 commit 6558075

File tree

4 files changed

+76
-66
lines changed

4 files changed

+76
-66
lines changed

internal/operator-controller/contentmanager/cache/cache.go

+49-43
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ type CloserSyncingSource interface {
4141
}
4242

4343
type sourcerer interface {
44-
// Source returns a CloserSyncingSource for the provided
45-
// GroupVersionKind. If the CloserSyncingSource encounters an
44+
// Source returns a CloserSyncingSource for the provided namespace
45+
// and GroupVersionKind. If the CloserSyncingSource encounters an
4646
// error after having initially synced, it should requeue the
4747
// provided client.Object and call the provided callback function
48-
Source(schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
48+
Source(string, schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error)
4949
}
5050

5151
type cache struct {
52-
sources map[schema.GroupVersionKind]CloserSyncingSource
52+
sources map[sourceKey]CloserSyncingSource
5353
sourcerer sourcerer
5454
owner client.Object
5555
syncTimeout time.Duration
@@ -58,7 +58,7 @@ type cache struct {
5858

5959
func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration) Cache {
6060
return &cache{
61-
sources: make(map[schema.GroupVersionKind]CloserSyncingSource),
61+
sources: make(map[sourceKey]CloserSyncingSource),
6262
sourcerer: sourcerer,
6363
owner: owner,
6464
syncTimeout: syncTimeout,
@@ -70,15 +70,15 @@ var _ Cache = (*cache)(nil)
7070
func (c *cache) Watch(ctx context.Context, watcher Watcher, objs ...client.Object) error {
7171
c.mu.Lock()
7272
defer c.mu.Unlock()
73-
gvkSet, err := gvksForObjects(objs...)
73+
sourceKeySet, err := sourceKeysForObjects(objs...)
7474
if err != nil {
7575
return fmt.Errorf("getting set of GVKs for managed objects: %w", err)
7676
}
7777

78-
if err := c.removeStaleSources(gvkSet); err != nil {
78+
if err := c.removeStaleSources(sourceKeySet); err != nil {
7979
return fmt.Errorf("removing stale sources: %w", err)
8080
}
81-
return c.startNewSources(ctx, gvkSet, watcher)
81+
return c.startNewSources(ctx, sourceKeySet, watcher)
8282
}
8383

8484
func (c *cache) Close() error {
@@ -99,29 +99,35 @@ func (c *cache) Close() error {
9999
return errors.Join(errs...)
100100
}
101101

102-
func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupVersionKind], watcher Watcher) error {
103-
cacheGvks := c.getCacheGVKs()
104-
gvksToCreate := gvks.Difference(cacheGvks)
102+
type sourceKey struct {
103+
namespace string
104+
gvk schema.GroupVersionKind
105+
}
105106

107+
func (c *cache) startNewSources(ctx context.Context, sources sets.Set[sourceKey], watcher Watcher) error {
106108
type startResult struct {
107109
source CloserSyncingSource
108-
gvk schema.GroupVersionKind
110+
key sourceKey
109111
err error
110112
}
111113
startResults := make(chan startResult)
112114
wg := sync.WaitGroup{}
113-
for _, gvk := range gvksToCreate.UnsortedList() {
115+
116+
existingSourceKeys := c.getCacheKeys()
117+
sourcesToCreate := sources.Difference(existingSourceKeys)
118+
for _, srcKey := range sourcesToCreate.UnsortedList() {
114119
wg.Add(1)
115120
go func() {
116121
defer wg.Done()
117-
source, err := c.startNewSource(ctx, gvk, watcher)
122+
source, err := c.startNewSource(ctx, srcKey, watcher)
118123
startResults <- startResult{
119124
source: source,
120-
gvk: gvk,
125+
key: srcKey,
121126
err: err,
122127
}
123128
}()
124129
}
130+
125131
go func() {
126132
wg.Wait()
127133
close(startResults)
@@ -134,7 +140,7 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
134140
continue
135141
}
136142

137-
err := c.addSource(result.gvk, result.source)
143+
err := c.addSource(result.key, result.source)
138144
if err != nil {
139145
// If we made it here then there is a logic error in
140146
// calculating the diffs between what is currently being
@@ -146,20 +152,19 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV
146152
slices.SortFunc(sourcesErrors, func(a, b error) int {
147153
return strings.Compare(a.Error(), b.Error())
148154
})
149-
150155
return errors.Join(sourcesErrors...)
151156
}
152157

153-
func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, watcher Watcher) (CloserSyncingSource, error) {
154-
s, err := c.sourcerer.Source(gvk, c.owner, func(ctx context.Context) {
158+
func (c *cache) startNewSource(ctx context.Context, srcKey sourceKey, watcher Watcher) (CloserSyncingSource, error) {
159+
s, err := c.sourcerer.Source(srcKey.namespace, srcKey.gvk, c.owner, func(ctx context.Context) {
155160
// this callback function ensures that we remove the source from the
156161
// cache if it encounters an error after it initially synced successfully
157162
c.mu.Lock()
158163
defer c.mu.Unlock()
159-
err := c.removeSource(gvk)
164+
err := c.removeSource(srcKey)
160165
if err != nil {
161166
logr := log.FromContext(ctx)
162-
logr.Error(err, "managed content cache postSyncError removing source failed", "gvk", gvk)
167+
logr.Error(err, "managed content cache postSyncError removing source failed", "namespace", srcKey.namespace, "gvk", srcKey.gvk)
163168
}
164169
})
165170
if err != nil {
@@ -168,7 +173,7 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,
168173

169174
err = watcher.Watch(s)
170175
if err != nil {
171-
return nil, fmt.Errorf("establishing watch for GVK %q: %w", gvk, err)
176+
return nil, fmt.Errorf("establishing watch for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
172177
}
173178

174179
syncCtx, syncCancel := context.WithTimeout(ctx, c.syncTimeout)
@@ -181,19 +186,19 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind,
181186
return s, nil
182187
}
183188

184-
func (c *cache) addSource(gvk schema.GroupVersionKind, source CloserSyncingSource) error {
185-
if _, ok := c.sources[gvk]; !ok {
186-
c.sources[gvk] = source
189+
func (c *cache) addSource(key sourceKey, source CloserSyncingSource) error {
190+
if _, ok := c.sources[key]; !ok {
191+
c.sources[key] = source
187192
return nil
188193
}
189194
return errors.New("source already exists")
190195
}
191196

192-
func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error {
193-
cacheGvks := c.getCacheGVKs()
197+
func (c *cache) removeStaleSources(srcKeys sets.Set[sourceKey]) error {
198+
existingSrcKeys := c.getCacheKeys()
194199
removeErrs := []error{}
195-
gvksToRemove := cacheGvks.Difference(gvks)
196-
for _, gvk := range gvksToRemove.UnsortedList() {
200+
srcKeysToRemove := existingSrcKeys.Difference(srcKeys)
201+
for _, gvk := range srcKeysToRemove.UnsortedList() {
197202
err := c.removeSource(gvk)
198203
if err != nil {
199204
removeErrs = append(removeErrs, err)
@@ -207,23 +212,23 @@ func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error
207212
return errors.Join(removeErrs...)
208213
}
209214

210-
func (c *cache) removeSource(gvk schema.GroupVersionKind) error {
211-
if source, ok := c.sources[gvk]; ok {
212-
err := source.Close()
215+
func (c *cache) removeSource(srcKey sourceKey) error {
216+
if src, ok := c.sources[srcKey]; ok {
217+
err := src.Close()
213218
if err != nil {
214-
return fmt.Errorf("closing source for GVK %q: %w", gvk, err)
219+
return fmt.Errorf("closing source for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err)
215220
}
216221
}
217-
delete(c.sources, gvk)
222+
delete(c.sources, srcKey)
218223
return nil
219224
}
220225

221-
func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
222-
cacheGvks := sets.New[schema.GroupVersionKind]()
223-
for gvk := range c.sources {
224-
cacheGvks.Insert(gvk)
226+
func (c *cache) getCacheKeys() sets.Set[sourceKey] {
227+
sourceKeys := sets.New[sourceKey]()
228+
for key := range c.sources {
229+
sourceKeys.Insert(key)
225230
}
226-
return cacheGvks
231+
return sourceKeys
227232
}
228233

229234
// gvksForObjects builds a sets.Set of GroupVersionKinds for
@@ -233,9 +238,10 @@ func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] {
233238
//
234239
// An empty Group is assumed to be the "core" Kubernetes
235240
// API group.
236-
func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], error) {
237-
gvkSet := sets.New[schema.GroupVersionKind]()
241+
func sourceKeysForObjects(objs ...client.Object) (sets.Set[sourceKey], error) {
242+
sourceKeys := sets.New[sourceKey]()
238243
for _, obj := range objs {
244+
ns := obj.GetNamespace()
239245
gvk := obj.GetObjectKind().GroupVersionKind()
240246

241247
// If the Kind or Version is not set in an object's GroupVersionKind
@@ -257,8 +263,8 @@ func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], e
257263
)
258264
}
259265

260-
gvkSet.Insert(gvk)
266+
sourceKeys.Insert(sourceKey{ns, gvk})
261267
}
262268

263-
return gvkSet, nil
269+
return sourceKeys, nil
264270
}

internal/operator-controller/contentmanager/cache/cache_test.go

+22-18
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010
corev1 "k8s.io/api/core/v1"
1111
"k8s.io/apimachinery/pkg/runtime/schema"
12+
"k8s.io/apimachinery/pkg/util/rand"
1213
"k8s.io/client-go/util/workqueue"
1314
"sigs.k8s.io/controller-runtime/pkg/client"
1415
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -34,7 +35,7 @@ type mockSourcerer struct {
3435

3536
var _ sourcerer = (*mockSourcerer)(nil)
3637

37-
func (ms *mockSourcerer) Source(_ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) {
38+
func (ms *mockSourcerer) Source(_ string, _ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) {
3839
if ms.err != nil {
3940
return nil, ms.err
4041
}
@@ -69,11 +70,11 @@ func TestCacheWatch(t *testing.T) {
6970
)
7071

7172
pod := &corev1.Pod{}
72-
podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
73-
pod.SetGroupVersionKind(podGvk)
73+
pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
74+
pod.SetNamespace(rand.String(8))
7475

7576
require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod))
76-
require.Contains(t, c.(*cache).sources, podGvk, "sources", c.(*cache).sources)
77+
require.Contains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()}, "sources", c.(*cache).sources)
7778
}
7879

7980
func TestCacheWatchInvalidGVK(t *testing.T) {
@@ -147,9 +148,9 @@ func TestCacheWatchExistingSourceNotPanic(t *testing.T) {
147148
)
148149

149150
pod := &corev1.Pod{}
150-
podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
151-
pod.SetGroupVersionKind(podGvk)
152-
require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{}))
151+
pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
152+
pod.SetNamespace(rand.String(8))
153+
require.NoError(t, c.(*cache).addSource(sourceKey{pod.Namespace, pod.GroupVersionKind()}, &mockSource{}))
153154

154155
// In this case, a panic means there is a logic error somewhere in the
155156
// cache.Watch() method. It should never hit the condition where it panics
@@ -167,18 +168,18 @@ func TestCacheWatchRemovesStaleSources(t *testing.T) {
167168
)
168169

169170
pod := &corev1.Pod{}
170-
podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
171-
pod.SetGroupVersionKind(podGvk)
171+
pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
172+
pod.SetNamespace(rand.String(8))
172173

173174
require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod))
174-
require.Contains(t, c.(*cache).sources, podGvk)
175+
require.Contains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()})
175176

176177
secret := &corev1.Secret{}
177-
secretGvk := corev1.SchemeGroupVersion.WithKind("Secret")
178-
secret.SetGroupVersionKind(secretGvk)
178+
secret.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret"))
179+
secret.SetNamespace(rand.String(8))
179180
require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, secret))
180-
require.Contains(t, c.(*cache).sources, secretGvk)
181-
require.NotContains(t, c.(*cache).sources, podGvk)
181+
require.Contains(t, c.(*cache).sources, sourceKey{secret.Namespace, secret.GroupVersionKind()})
182+
require.NotContains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()})
182183
}
183184

184185
func TestCacheWatchRemovingStaleSourcesError(t *testing.T) {
@@ -190,13 +191,16 @@ func TestCacheWatchRemovingStaleSourcesError(t *testing.T) {
190191
time.Second,
191192
)
192193

193-
podGvk := corev1.SchemeGroupVersion.WithKind("Pod")
194-
require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{
194+
podSourceKey := sourceKey{
195+
namespace: rand.String(8),
196+
gvk: corev1.SchemeGroupVersion.WithKind("Pod"),
197+
}
198+
require.NoError(t, c.(*cache).addSource(podSourceKey, &mockSource{
195199
err: errors.New("error"),
196200
}))
197201

198202
secret := &corev1.Secret{}
199-
secretGvk := corev1.SchemeGroupVersion.WithKind("Secret")
200-
secret.SetGroupVersionKind(secretGvk)
203+
secret.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret"))
204+
secret.SetNamespace(rand.String(8))
201205
require.Error(t, c.Watch(context.Background(), &mockWatcher{}, secret))
202206
}

internal/operator-controller/contentmanager/contentmanager.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ func (i *managerImpl) Get(ctx context.Context, ce *ocv1.ClusterExtension) (cmcac
116116
// related to reusing an informer factory, we return a new informer
117117
// factory every time to ensure we are not attempting to configure or
118118
// start an already started informer
119-
informerFactoryCreateFunc: func() dynamicinformer.DynamicSharedInformerFactory {
120-
return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, metav1.NamespaceAll, func(lo *metav1.ListOptions) {
119+
informerFactoryCreateFunc: func(namespace string) dynamicinformer.DynamicSharedInformerFactory {
120+
return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, namespace, func(lo *metav1.ListOptions) {
121121
lo.LabelSelector = tgtLabels.AsSelector().String()
122122
})
123123
},

internal/operator-controller/contentmanager/sourcerer.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import (
2121
)
2222

2323
type dynamicSourcerer struct {
24-
informerFactoryCreateFunc func() dynamicinformer.DynamicSharedInformerFactory
24+
informerFactoryCreateFunc func(namespace string) dynamicinformer.DynamicSharedInformerFactory
2525
mapper meta.RESTMapper
2626
}
2727

28-
func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) {
28+
func (ds *dynamicSourcerer) Source(namespace string, gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) {
2929
scheme, err := buildScheme(gvk)
3030
if err != nil {
3131
return nil, fmt.Errorf("building scheme: %w", err)
@@ -48,7 +48,7 @@ func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Obj
4848
GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true },
4949
},
5050
},
51-
DynamicInformerFactory: ds.informerFactoryCreateFunc(),
51+
DynamicInformerFactory: ds.informerFactoryCreateFunc(namespace),
5252
OnPostSyncError: onPostSyncError,
5353
})
5454
return s, nil

0 commit comments

Comments
 (0)