-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue.go
140 lines (124 loc) · 3.02 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"github.com/utilitywarehouse/semaphore-service-mirror/log"
"github.com/utilitywarehouse/semaphore-service-mirror/metrics"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// queueReconcileFunc reconciles the object indicated by the name and namespace
type queueReconcileFunc func(name, namespace string) error
// queue provides a rate-limited queue that processes items with a provided
// reconcile function
type queue struct {
name string
reconcileFunc queueReconcileFunc
queue workqueue.RateLimitingInterface
requeued []string
}
// newQueue returns a new queue
func newQueue(name string, reconcileFunc queueReconcileFunc) *queue {
return &queue{
name: name,
reconcileFunc: reconcileFunc,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
}
}
// Add an item to the queue, where that item is an object that
// implements meta.Interface.
func (q *queue) Add(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.Logger.Error("couldn't create object key", "queue", q.name, "err", err)
return
}
q.queue.Add(key)
}
// Run processes items from the queue as they're added
func (q *queue) Run() {
q.updateMetrics()
for q.processItem() {
q.updateMetrics()
}
}
// Stop causes the queue to shut down
func (q *queue) Stop() {
q.queue.ShutDown()
}
// processItem processes the next item in the queue
func (q *queue) processItem() bool {
key, shutdown := q.queue.Get()
if shutdown {
log.Logger.Info("queue shutdown", "queue", q.name)
return false
}
defer q.queue.Done(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
log.Logger.Error(
"error parsing key",
"queue", q.name,
"key", key.(string),
"err", err,
)
q.forget(key)
return true
}
log.Logger.Info(
"reconciling item",
"queue", q.name,
"namespace", namespace,
"name", name,
)
if err := q.reconcileFunc(name, namespace); err != nil {
log.Logger.Error(
"reconcile error",
"queue", q.name,
"namespace", namespace,
"name", name,
"err", err,
)
q.requeue(key)
log.Logger.Info(
"requeued item",
"queue", q.name,
"namespace", namespace,
"name", name,
)
} else {
log.Logger.Info(
"successfully reconciled item",
"queue", q.name,
"namespace", namespace,
"name", name,
)
q.forget(key)
}
return true
}
func (q *queue) requeue(key interface{}) {
q.queue.AddRateLimited(key)
q.addRequeued(key.(string))
}
func (q *queue) forget(key interface{}) {
q.queue.Forget(key)
q.removeRequeued(key.(string))
}
func (q *queue) addRequeued(key string) {
for _, k := range q.requeued {
if k == key {
return
}
}
q.requeued = append(q.requeued, key)
}
func (q *queue) removeRequeued(key string) {
for i, k := range q.requeued {
if k == key {
q.requeued = append(q.requeued[:i], q.requeued[i+1:]...)
break
}
}
}
func (q *queue) updateMetrics() {
metrics.SetRequeued(q.name, float64(len(q.requeued)))
}