Skip to content

[WIP][release-4.14] OCPBUGS-48663: Fix concurrent namespace resolution #949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: release-4.14
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,21 @@ spec:
topologyKey:
description: This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching the labelSelector in the specified namespaces, where co-located is defined as running on a node whose value of the label with key topologyKey matches that of any node on which any of the selected pods is running. Empty topologyKey is not allowed.
type: string
extractContent:
description: ExtractContent configures the gRPC catalog Pod to extract catalog metadata from the provided index image and use a well-known version of the `opm` server to expose it. The catalog index image that this CatalogSource is configured to use *must* be using the file-based catalogs in order to utilize this feature.
type: object
required:
- cacheDir
- catalogDir
properties:
cacheDir:
description: CacheDir is the directory storing the pre-calculated API cache.
type: string
catalogDir:
description: CatalogDir is the directory storing the file-based catalog contents.
type: string
memoryTarget:
description: "MemoryTarget configures the $GOMEMLIMIT value for the gRPC catalog Pod. This is a soft memory limit for the server, which the runtime will attempt to meet but makes no guarantees that it will do so. If this value is set, the Pod will have the following modifications made to the container running the server: - the $GOMEMLIMIT environment variable will be set to this value in bytes - the memory request will be set to this value - the memory limit will be set to 200% of this value \n This field should be set if it's desired to reduce the footprint of a catalog server as much as possible, or if a catalog being served is very large and needs more than the default allocation. If your index image has a file- system cache, determine a good approximation for this value by doubling the size of the package cache at /tmp/cache/cache/packages.json in the index image. \n This field is best-effort; if unset, no default will be used and no Pod memory limit or $GOMEMLIMIT value will be set."
description: "MemoryTarget configures the $GOMEMLIMIT value for the gRPC catalog Pod. This is a soft memory limit for the server, which the runtime will attempt to meet but makes no guarantees that it will do so. If this value is set, the Pod will have the following modifications made to the container running the server: - the $GOMEMLIMIT environment variable will be set to this value in bytes - the memory request will be set to this value \n This field should be set if it's desired to reduce the footprint of a catalog server as much as possible, or if a catalog being served is very large and needs more than the default allocation. If your index image has a file- system cache, determine a good approximation for this value by doubling the size of the package cache at /tmp/cache/cache/packages.json in the index image. \n This field is best-effort; if unset, no default will be used and no Pod memory limit or $GOMEMLIMIT value will be set."
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
anyOf:
- type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand Down Expand Up @@ -247,7 +248,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire InstallPlans
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
ipQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ips")
ipQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: "ips",
})
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
ipQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -266,7 +270,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ogs")
ogQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: "ogs",
})
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -285,15 +292,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catsrcQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "catsrcs")
catsrcQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: "catsrcs",
})
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
catsrcQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithMetricsProvider(metrics.NewMetricsCatalogSource(op.lister.OperatorsV1alpha1().CatalogSourceLister())),
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(catsrcQueue),
queueinformer.WithInformer(catsrcInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncerWithDelete(op.handleCatSrcDeletion)),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncer()),
queueinformer.WithDeletionHandler(op.handleCatSrcDeletion),
)
if err != nil {
return nil, err
Expand All @@ -311,7 +322,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer

subQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "subs")
subQueue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: "subs",
})
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
subSyncer, err := subscription.NewSyncer(
ctx,
Expand All @@ -322,7 +336,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subscription.WithCatalogInformer(catsrcInformer.Informer()),
subscription.WithInstallPlanInformer(ipInformer.Informer()),
subscription.WithSubscriptionQueue(subQueue),
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
subscription.WithRegistryReconcilerFactory(op.reconciler),
subscription.WithGlobalCatalogNamespace(op.namespace),
)
Expand Down Expand Up @@ -407,13 +421,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())

// Generate and register QueueInformers for k8s resources
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()
for _, informer := range sharedIndexInformers {
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(informer),
queueinformer.WithSyncer(k8sSyncer),
queueinformer.WithDeletionHandler(op.handleDeletion),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -451,7 +466,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(crdInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()),
queueinformer.WithDeletionHandler(op.handleDeletion),
)
if err != nil {
return nil, err
Expand All @@ -463,7 +479,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsResolveQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
op.nsResolveQueue = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: "resolve",
})
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
Expand Down Expand Up @@ -502,12 +521,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

if err == nil {
for ns := range namespaces {
o.nsResolveQueue.Add(ns)
o.nsResolveQueue.Add(types.NamespacedName{Name: ns})
}
}
}

o.nsResolveQueue.Add(state.Key.Namespace)
o.nsResolveQueue.Add(types.NamespacedName{Name: state.Key.Namespace})
}
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
Expand Down Expand Up @@ -588,18 +607,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
func (o *Operator) handleCatSrcDeletion(obj interface{}) {
catsrc, ok := obj.(metav1.Object)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}

catsrc, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
return
}
catsrc, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
return
}
}
sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
Expand Down Expand Up @@ -1118,8 +1135,8 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
return updateErr
}

logger.Debug("unpacking is not complete yet, requeueing")
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
logger.Info("unpacking is not complete yet, requeueing")
o.nsResolveQueue.AddAfter(types.NamespacedName{Name: namespace}, 5*time.Second)
return nil
}
}
Expand Down Expand Up @@ -1201,7 +1218,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return fmt.Errorf("casting Subscription failed")
}

o.nsResolveQueue.Add(sub.GetNamespace())
o.nsResolveQueue.Add(types.NamespacedName{Name: sub.GetNamespace()})

return nil
}
Expand All @@ -1215,7 +1232,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
return fmt.Errorf("casting OperatorGroup failed")
}

o.nsResolveQueue.Add(og.GetNamespace())
o.nsResolveQueue.Add(types.NamespacedName{Name: og.GetNamespace()})

return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1861,12 +1861,15 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
client: clientFake,
lister: lister,
namespace: namespace,
nsResolveQueue: workqueue.NewNamedRateLimitingQueue(
nsResolveQueue: workqueue.NewRateLimitingQueueWithConfig(
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 1000*time.Second),
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
), "resolver"),
),
workqueue.RateLimitingQueueConfig{
Name: "resolver",
}),
resolver: config.resolver,
reconciler: config.reconciler,
recorder: config.recorder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@ import (

// ReconcilerFromLegacySyncHandler returns a reconciler that invokes the given legacy sync handler and on delete funcs.
// Since the reconciler does not return an updated kubestate, it MUST be the last reconciler in a given chain.
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDelete func(obj interface{})) kubestate.Reconciler {
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler) kubestate.Reconciler {
var rec kubestate.ReconcilerFunc = func(ctx context.Context, in kubestate.State) (out kubestate.State, err error) {
out = in
switch s := in.(type) {
case SubscriptionExistsState:
if sync != nil {
err = sync(s.Subscription())
}
case SubscriptionDeletedState:
if onDelete != nil {
onDelete(s.Subscription())
}
case SubscriptionState:
if sync != nil {
err = sync(s.Subscription())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type SubscriptionState interface {
Subscription() *v1alpha1.Subscription
Add() SubscriptionExistsState
Update() SubscriptionExistsState
Delete() SubscriptionDeletedState
}

// SubscriptionExistsState describes subscription states in which the subscription exists on the cluster.
Expand All @@ -49,13 +48,6 @@ type SubscriptionUpdatedState interface {
isSubscriptionUpdatedState()
}

// SubscriptionDeletedState describes subscription states in which the subscription no longer exists and was deleted from the cluster.
type SubscriptionDeletedState interface {
SubscriptionState

isSubscriptionDeletedState()
}

// CatalogHealthState describes subscription states that represent a subscription with respect to catalog health.
type CatalogHealthState interface {
SubscriptionExistsState
Expand Down Expand Up @@ -175,12 +167,6 @@ func (s *subscriptionState) Update() SubscriptionExistsState {
}
}

func (s *subscriptionState) Delete() SubscriptionDeletedState {
return &subscriptionDeletedState{
SubscriptionState: s,
}
}

func NewSubscriptionState(sub *v1alpha1.Subscription) SubscriptionState {
return &subscriptionState{
State: kubestate.NewState(),
Expand All @@ -206,12 +192,6 @@ type subscriptionUpdatedState struct {

func (c *subscriptionUpdatedState) isSubscriptionUpdatedState() {}

type subscriptionDeletedState struct {
SubscriptionState
}

func (c *subscriptionDeletedState) isSubscriptionDeletedState() {}

type catalogHealthState struct {
SubscriptionExistsState
}
Expand Down
Loading