Skip to content

Commit

Permalink
simplify client informer handling (#811)
Browse files Browse the repository at this point in the history
* simplify client informer handling

Signed-off-by: Frank Jogeleit <[email protected]>

* add new test cases

Signed-off-by: Frank Jogeleit <[email protected]>

* update startup logic to handle runtime target changes

Signed-off-by: Frank Jogeleit <[email protected]>

---------

Signed-off-by: Frank Jogeleit <[email protected]>
  • Loading branch information
fjogeleit authored Mar 10, 2025
1 parent 57ee0da commit ee11c57
Show file tree
Hide file tree
Showing 22 changed files with 738 additions and 206 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ test:
.PHONY: coverage
coverage:
go test -v ./... -covermode=count -coverprofile=coverage.out.tmp -timeout=30s
cat coverage.out.tmp | grep -v "github.com/kyverno/policy-reporter/cmd/" | grep -v "github.com/kyverno/policy-reporter/main.go" | grep -v "github.com/kyverno/policy-reporter/pkg/crd/" > coverage.out
cat coverage.out.tmp | grep -v "github.com/kyverno/policy-reporter/cmd/" | grep -v "github.com/kyverno/policy-reporter/main.go" | grep -v "github.com/kyverno/policy-reporter/pkg/crd/" | grep -v "github.com/kyverno/policy-reporter/hack/main.go" | grep -v "github.com/kyverno/policy-reporter/hack/controller-gen/" > coverage.out
rm coverage.out.tmp

.PHONY: build
Expand Down
21 changes: 6 additions & 15 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/kyverno/policy-reporter/pkg/config"
"github.com/kyverno/policy-reporter/pkg/database"
"github.com/kyverno/policy-reporter/pkg/listener"
"github.com/kyverno/policy-reporter/pkg/targetconfig"
)

func newRunCMD(version string) *cobra.Command {
Expand Down Expand Up @@ -63,7 +62,6 @@ func newRunCMD(version string) *cobra.Command {
return err
}

targetChan := make(chan targetconfig.TcEvent)
g := &errgroup.Group{}

var store *database.Store
Expand Down Expand Up @@ -142,7 +140,7 @@ func newRunCMD(version string) *cobra.Command {
}
}

resolver.RegisterSendResultListener(targetChan)
resolver.RegisterSendResultListener()

readinessProbe.Ready()
}).RegisterOnNew(func(currentID, lockID string) {
Expand All @@ -167,7 +165,7 @@ func newRunCMD(version string) *cobra.Command {
return elector.Run(cmd.Context())
})
} else {
resolver.RegisterSendResultListener(targetChan)
resolver.RegisterSendResultListener()
readinessProbe.Ready()
}

Expand All @@ -179,11 +177,8 @@ func newRunCMD(version string) *cobra.Command {
g.Go(server.Start)

g.Go(func() error {
// call TargetClients to ensure targets passed from the config file are initialized
resolver.TargetClients()
stop := make(chan struct{})

_, err = resolver.TargetConfigClient(targetChan)
_, err = resolver.TargetConfigClient()
if err != nil {
return err
}
Expand All @@ -196,7 +191,7 @@ func newRunCMD(version string) *cobra.Command {
})

g.Go(func() error {
logger.Info("wait policy informer")
logger.Info("wait for policy informer")
readinessProbe.Wait()

logger.Info("start client", zap.Int("worker", c.WorkerCount))
Expand All @@ -212,15 +207,11 @@ func newRunCMD(version string) *cobra.Command {
})

g.Go(func() error {
collection := resolver.TargetClients()
if !collection.UsesSecrets() {
return nil
}

logger.Info("wait for secret informer")
readinessProbe.Wait()

stop := make(chan struct{})
if err := secretInformer.Sync(collection, stop); err != nil {
if err := secretInformer.Sync(resolver.TargetClients(), stop); err != nil {
zap.L().Error("secret informer error", zap.Error(err))

return err
Expand Down
1 change: 0 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ type Cache interface {
GetResults(id string) []string
Shared() bool
Clear()
Clone() Cache
}
20 changes: 0 additions & 20 deletions pkg/cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,6 @@ func (c *inMemoryCache) Shared() bool {
return false
}

func (c *inMemoryCache) Clone() Cache {
oldItems := c.caches.Items()
// this is the upper cache
newCache := gocache.New(gocache.NoExpiration, 5*time.Minute)

for key, item := range oldItems {
c2 := item.Object.(*gocache.Cache).Items()
innerCache := gocache.New(gocache.NoExpiration, 5*time.Minute)

for innerKey := range c2 {
innerCache.Set(innerKey, struct{}{}, c.keepDuration)
}

newCache.Set(key, innerCache, c.keepDuration)
}
return &inMemoryCache{
caches: newCache,
}
}

func NewInMermoryCache(keepDuration, keepReport time.Duration) Cache {
cache := gocache.New(gocache.NoExpiration, 5*time.Minute)
cache.OnEvicted(func(s string, i interface{}) {
Expand Down
5 changes: 0 additions & 5 deletions pkg/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ func (r *redisCache) AddReport(report v1alpha2.ReportInterface) {
}
}

// doesn't make sense in redis
func (r *redisCache) Clone() Cache {
return r
}

func (r *redisCache) RemoveReport(id string) {
keys, err := r.rdb.Keys(context.Background(), r.generateKeyPattern(id)).Result()
if err != nil {
Expand Down
44 changes: 10 additions & 34 deletions pkg/config/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"strings"
"time"
Expand Down Expand Up @@ -63,7 +62,7 @@ type Resolver struct {
targetClients *target.Collection
targetsCreated bool
targetFactory target.Factory
targetConfigClient *targetconfig.TargetConfigClient
targetConfigClient *targetconfig.Client
logger *zap.Logger
resultListener *listener.ResultListener
polrRestartCh chan struct{}
Expand Down Expand Up @@ -261,42 +260,22 @@ func (r *Resolver) Queue() (*kubernetes.Queue, error) {
func (r *Resolver) RegisterNewResultsListener() {
targets := r.TargetClients()

newResultListener := listener.NewResultListener(r.SkipExistingOnStartup(), r.ResultCache(), time.Now())
r.resultListener = newResultListener
r.EventPublisher().RegisterListener(listener.NewResults, newResultListener.Listen)
r.resultListener = listener.NewResultListener(r.SkipExistingOnStartup(), r.ResultCache(), time.Now())
r.EventPublisher().RegisterListener(listener.NewResults, r.resultListener.Listen)

r.EventPublisher().RegisterPostListener(listener.CleanUpListener, listener.NewCleanupListener(context.Background(), targets))
}

// RegisterSendResultListener resolver method
func (r *Resolver) RegisterSendResultListener(targetChan chan targetconfig.TcEvent) {
registerFunc := func(targets *target.Collection) {
r.resultListener.RegisterListener(listener.NewSendResultListener(targets))
r.resultListener.RegisterScopeListener(listener.NewSendScopeResultsListener(targets))
r.resultListener.RegisterSyncListener(listener.NewSendSyncResultsListener(targets))
}

go func() {
r.logger.Info("starting listener loop")
for {
select {
case event := <-targetChan:
// clear existing listeners and create new ones
r.logger.Info(fmt.Sprintf("received targetconfig event of type %s", event.Type))
r.resultListener.ResetListeners()
registerFunc(event.Targets)

case <-time.After(5 * time.Second):
}
}
}()

func (r *Resolver) RegisterSendResultListener() {
targets := r.TargetClients()
if r.resultListener == nil {
r.RegisterNewResultsListener()
}

registerFunc(targets)
r.resultListener.RegisterListener(listener.NewSendResultListener(targets))
r.resultListener.RegisterScopeListener(listener.NewSendScopeResultsListener(targets))
r.resultListener.RegisterSyncListener(listener.NewSendSyncResultsListener(targets))
}

// UnregisterSendResultListener resolver method
Expand Down Expand Up @@ -569,7 +548,7 @@ func (r *Resolver) EmailClient() *email.Client {
return email.NewClient(r.config.EmailReports.SMTP.From, r.SMTPServer())
}

func (r *Resolver) TargetConfigClient(targetChan chan targetconfig.TcEvent) (*targetconfig.TargetConfigClient, error) {
func (r *Resolver) TargetConfigClient() (*targetconfig.Client, error) {
if r.targetConfigClient != nil {
return r.targetConfigClient, nil
}
Expand All @@ -579,11 +558,8 @@ func (r *Resolver) TargetConfigClient(targetChan chan targetconfig.TcEvent) (*ta
return nil, err
}

tcc := targetconfig.NewTargetConfigClient(tcClient, r.TargetFactory(), r.TargetClients(), r.logger)
err = tcc.CreateInformer(targetChan)
if err != nil {
return nil, err
}
tcc := targetconfig.NewClient(tcClient, r.TargetFactory(), r.TargetClients(), r.logger)
tcc.ConfigureInformer()

r.targetConfigClient = tcc
return tcc, nil
Expand Down
4 changes: 1 addition & 3 deletions pkg/config/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/kyverno/policy-reporter/pkg/database"
"github.com/kyverno/policy-reporter/pkg/report"
"github.com/kyverno/policy-reporter/pkg/target"
"github.com/kyverno/policy-reporter/pkg/targetconfig"
)

var targets = target.Targets{
Expand Down Expand Up @@ -464,8 +463,7 @@ func Test_RegisterSendResultListener(t *testing.T) {
t.Run("Register SendResultListener with Targets", func(t *testing.T) {
resolver := config.NewResolver(testConfig, &rest.Config{})
resolver.Logger()
targetChan := make(chan targetconfig.TcEvent)
resolver.RegisterSendResultListener(targetChan)
resolver.RegisterSendResultListener()

assert.Len(t, resolver.EventPublisher().GetListener(), 1, "Expected one Listener to be registered")
})
Expand Down
55 changes: 55 additions & 0 deletions pkg/fixtures/policy_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,44 @@ var PassPodResult = v1alpha2.PolicyReportResult{
Properties: map[string]string{},
}

var WarnPodResult = v1alpha2.PolicyReportResult{
ID: "124",
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Result: v1alpha2.StatusWarn,
Category: "Best Practices",
Scored: true,
Source: "Kyverno",
Resources: []corev1.ObjectReference{{
APIVersion: "v1",
Kind: "Pod",
Name: "nginx",
Namespace: "test",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188419",
}},
Properties: map[string]string{},
}

var ErrorPodResult = v1alpha2.PolicyReportResult{
ID: "124",
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Result: v1alpha2.StatusError,
Category: "Best Practices",
Scored: true,
Source: "Kyverno",
Resources: []corev1.ObjectReference{{
APIVersion: "v1",
Kind: "Pod",
Name: "nginx",
Namespace: "test",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188419",
}},
Properties: map[string]string{},
}

var TrivyResult = v1alpha2.PolicyReportResult{
ID: "124",
Message: "validation error",
Expand Down Expand Up @@ -112,6 +150,23 @@ var FailPodResult = v1alpha2.PolicyReportResult{
}},
}

var SkipPodResult = v1alpha2.PolicyReportResult{
ID: "124",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Result: v1alpha2.StatusSkip,
Category: "Best Practices",
Scored: true,
Source: "Kyverno",
Resources: []corev1.ObjectReference{{
APIVersion: "v1",
Kind: "Pod",
Name: "nginx",
Namespace: "test",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188419",
}},
}

var FailResultWithoutResource = v1alpha2.PolicyReportResult{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Expand Down
18 changes: 18 additions & 0 deletions pkg/helper/chunk_slice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package helper_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/kyverno/policy-reporter/pkg/helper"
)

func TestChunkSize(t *testing.T) {
chunks := helper.ChunkSlice([]int{1, 2, 3, 4, 5, 6, 7}, 3)

assert.Len(t, chunks, 3)
assert.Equal(t, []int{1, 2, 3}, chunks[0])
assert.Equal(t, []int{4, 5, 6}, chunks[1])
assert.Equal(t, []int{7}, chunks[2])
}
14 changes: 14 additions & 0 deletions pkg/helper/title_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package helper_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/kyverno/policy-reporter/pkg/helper"
)

func TestTitle(t *testing.T) {
assert.Equal(t, "Kyverno", helper.Title("kyverno"))
assert.Equal(t, "Trivy Vulnerability", helper.Title("trivy vulnerability"))
}
2 changes: 1 addition & 1 deletion pkg/kubernetes/policy_report_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (k *k8sPolicyReportClient) Sync(stopper chan struct{}) error {

k.synced = true

zap.L().Info("informer sync completed")
zap.L().Info("policy report informer sync completed")

return nil
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/listener/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ func Test_CleanupListener(t *testing.T) {
assert.True(t, c.cleanupCalled, "expected cleanup method was called")
})
}

func Test_Cleanup_Listener_Skip_Added(t *testing.T) {
t.Run("Execute Cleanup Handler", func(t *testing.T) {
c := &client{cleanup: true}

slistener := listener.NewCleanupListener(ctx, target.NewCollection(&target.Target{Client: c}))
slistener(report.LifecycleEvent{Type: report.Added, PolicyReport: preport1})

assert.False(t, c.cleanupCalled, "expected cleanup execution was skipped")
})
}
19 changes: 19 additions & 0 deletions pkg/listener/fixture_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package listener_test

import (
"time"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kyverno/policy-reporter/pkg/crd/api/policyreport/v1alpha2"
"github.com/kyverno/policy-reporter/pkg/fixtures"
)

var scopereport1 = &v1alpha2.PolicyReport{
ObjectMeta: v1.ObjectMeta{
Name: "polr-test",
Namespace: "test",
CreationTimestamp: v1.NewTime(time.Now().Add(time.Hour)),
},
Scope: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: "test",
Namespace: "test",
},
Results: []v1alpha2.PolicyReportResult{fixtures.FailResult},
Summary: v1alpha2.PolicyReportSummary{Fail: 1},
}

var preport1 = &v1alpha2.PolicyReport{
ObjectMeta: v1.ObjectMeta{
Name: "polr-test",
Expand Down
Loading

0 comments on commit ee11c57

Please sign in to comment.