-
Notifications
You must be signed in to change notification settings - Fork 325
/
Copy pathcontroller.go
241 lines (208 loc) · 6.83 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
// Package controller contains a reusable abstraction for efficiently
// watching for changes in resources in a Kubernetes cluster.
package controller
import (
"context"
"fmt"
"sync"
"time"
"github.com/hashicorp/go-hclog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// Controller is a generic cache.Controller implementation that watches
// Kubernetes for changes to specific set of resources and calls the configured
// callbacks as data changes.
type Controller struct {
Log hclog.Logger
Resource Resource
informer cache.SharedIndexInformer
}
// Event is something that occurred to the resources we're watching.
type Event struct {
// Key is in the form of <namespace>/<name>, e.g. default/pod-abc123,
// and corresponds to the resource modified.
Key string
// Obj holds the resource that was modified at the time of the event
// occurring. If possible, the resource should be retrieved from the informer
// cache, instead of using this field because the cache will be more up to
// date at the time the event is processed.
// In some cases, such as a delete event, the resource will no longer exist
// in the cache and then it is useful to have the resource here.
Obj interface{}
}
// Run starts the Controller and blocks until stopCh is closed.
//
// Important: Callers must ensure that Run is only called once at a time.
func (c *Controller) Run(stopCh <-chan struct{}) {
// Properly handle any panics
defer utilruntime.HandleCrash()
// Create an informer so we can keep track of all service changes.
informer := c.Resource.Informer()
c.informer = informer
// Create a queue for storing items to process from the informer.
var queueOnce sync.Once
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
shutdown := func() { queue.ShutDown() }
defer queueOnce.Do(shutdown)
// Add an event handler when data is received from the informer. The
// event handlers here will block the informer so we just offload them
// immediately into a workqueue.
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// convert the resource object into a key (in this case
// we are just doing it in the format of 'namespace/name')
key, err := cache.MetaNamespaceKeyFunc(obj)
c.Log.Debug("queue", "op", "add", "key", key)
if err == nil {
queue.Add(Event{Key: key, Obj: obj})
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
c.Log.Debug("queue", "op", "update", "key", key)
if err == nil {
queue.Add(Event{Key: key, Obj: newObj})
}
},
DeleteFunc: c.informerDeleteHandler(queue),
})
if err != nil {
c.Log.Error("error adding informer event handlers", err)
}
// If the type is a background syncer, then we startup the background
// process.
if bg, ok := c.Resource.(Backgrounder); ok {
ctx, cancelF := context.WithCancel(context.Background())
// Run the backgrounder
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
bg.Run(ctx.Done())
}()
// Start a goroutine that automatically closes the context when we stop
go func() {
select {
case <-stopCh:
cancelF()
case <-ctx.Done():
// Cancelled outside
}
}()
// When we exit, close the context so the backgrounder ends
defer func() {
cancelF()
<-doneCh
}()
}
// Run the informer to start requesting resources
go func() {
informer.Run(stopCh)
// We have to shut down the queue here if we stop so that
// wait.Until stops below too. We can't wait until the defer at
// the top since wait.Until will block.
queueOnce.Do(shutdown)
}()
// Initial sync
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("error syncing cache"))
return
}
c.Log.Debug("initial cache sync complete")
// run the runWorker method every second with a stop channel
wait.Until(func() {
for c.processSingle(queue, informer) {
// Process
}
}, time.Second, stopCh)
}
// HasSynced implements cache.Controller.
func (c *Controller) HasSynced() bool {
if c.informer == nil {
return false
}
return c.informer.HasSynced()
}
// LastSyncResourceVersion implements cache.Controller.
func (c *Controller) LastSyncResourceVersion() string {
if c.informer == nil {
return ""
}
return c.informer.LastSyncResourceVersion()
}
func (c *Controller) processSingle(
queue workqueue.RateLimitingInterface,
informer cache.SharedIndexInformer,
) bool {
// Fetch the next item
rawEvent, quit := queue.Get()
if quit {
return false
}
defer queue.Done(rawEvent)
event, ok := rawEvent.(Event)
if !ok {
c.Log.Warn("processSingle: dropping event with unexpected type", "event", rawEvent)
return true
}
// Get the item from the informer to ensure we have the most up-to-date
// copy.
key := event.Key
item, exists, err := informer.GetIndexer().GetByKey(key)
// If we got the item successfully, call the proper method
if err == nil {
c.Log.Debug("processing object", "key", key, "exists", exists)
c.Log.Trace("processing object", "object", item)
if !exists {
// In the case of deletes, the item is no longer in the cache so
// we use the copy we got at the time of the event (event.Obj).
err = c.Resource.Delete(key, event.Obj)
} else {
err = c.Resource.Upsert(key, item)
}
if err == nil {
queue.Forget(rawEvent)
}
}
if err != nil {
if queue.NumRequeues(event) < 5 {
c.Log.Error("failed processing item, retrying", "key", key, "error", err)
queue.AddRateLimited(rawEvent)
} else {
c.Log.Error("failed processing item, no more retries", "key", key, "error", err)
queue.Forget(rawEvent)
utilruntime.HandleError(err)
}
}
return true
}
// GetByIndex allows querying the informer's indexer to avoid extra calls to k8s
func (c *Controller) GetByIndex(indexName, indexedValue string) ([]interface{}, error) {
if c.informer == nil {
return nil, nil
}
return c.informer.GetIndexer().ByIndex(indexName, indexedValue)
}
// informerDeleteHandler returns a function that implements
// `DeleteFunc` from the `ResourceEventHandlerFuncs` interface.
// It is split out as its own method to aid in testing.
func (c *Controller) informerDeleteHandler(queue workqueue.RateLimitingInterface) func(obj interface{}) {
return func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
c.Log.Debug("queue", "op", "delete", "key", key)
if err == nil {
// obj might be of type `cache.DeletedFinalStateUnknown`
// in which case we need to extract the object from
// within that struct.
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
queue.Add(Event{Key: key, Obj: d.Obj})
} else {
queue.Add(Event{Key: key, Obj: obj})
}
}
}
}