-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathscheduling_queue.go
427 lines (389 loc) · 14.1 KB
/
scheduling_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file contains structures that implement scheduling queue types.
// Scheduling queues hold pods waiting to be scheduled. This file has two types
// of scheduling queue: 1) a FIFO, which is mostly the same as cache.FIFO, 2) a
// priority queue which has two sub queues. One sub-queue holds pods that are
// being considered for scheduling. This is called activeQ. Another queue holds
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
// FIFO is here for flag-gating purposes and allows us to use the traditional
// scheduling queue when util.PodPriorityEnabled() returns false.
package queuejob
import (
"fmt"
"reflect"
"sync"
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
qjobv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
Add(qj *qjobv1.AppWrapper) error
AddIfNotPresent(qj *qjobv1.AppWrapper) error
AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) error
Pop() (*qjobv1.AppWrapper, error)
Update(oldQJ, newQJ *qjobv1.AppWrapper) error
Delete(QJ *qjobv1.AppWrapper) error
MoveToActiveQueueIfExists(QJ *qjobv1.AppWrapper) error
MoveAllToActiveQueue()
IfExist(QJ *qjobv1.AppWrapper) bool
IfExistActiveQ(QJ *qjobv1.AppWrapper) bool
IfExistUnschedulableQ(QJ *qjobv1.AppWrapper) bool
Length() int
}
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
func NewSchedulingQueue() SchedulingQueue {
return NewPriorityQueue()
}
// UnschedulablePods is an interface for a queue that is used to keep unschedulable
// pods. These pods are not actively reevaluated for scheduling. They are moved
// to the active scheduling queue on certain events, such as termination of a pod
// in the cluster, addition of nodes, etc.
type UnschedulableQJs interface {
Add(p *qjobv1.AppWrapper)
Delete(p *qjobv1.AppWrapper)
Update(p *qjobv1.AppWrapper)
Get(p *qjobv1.AppWrapper) *qjobv1.AppWrapper
Clear()
}
// PriorityQueue implements a scheduling queue. It is an alternative to FIFO.
// The head of PriorityQueue is the highest priority pending QJ. This structure
// has two sub queues. One sub-queue holds QJ that are being considered for
// scheduling. This is called activeQ and is a Heap. Another queue holds
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ.
// Heap is already thread safe, but we need to acquire another lock here to ensure
// atomicity of operations on the two data structures..
type PriorityQueue struct {
lock sync.RWMutex
cond sync.Cond
// activeQ is heap structure that scheduler actively looks at to find QJs to
// schedule. Head of heap is the highest priority QJ.
activeQ *Heap
// unschedulableQ holds QJs that have been tried and determined unschedulable.
unschedulableQ *UnschedulableQJMap
receivedMoveRequest bool
}
// Making sure that PriorityQueue implements SchedulingQueue.
var _ = SchedulingQueue(&PriorityQueue{})
func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
activeQ: newHeap(cache.MetaNamespaceKeyFunc, HigherSystemPriorityQJ),
unschedulableQ: newUnschedulableQJMap(),
}
pq.cond.L = &pq.lock
return pq
}
func (p *PriorityQueue) Length() int {
p.lock.Lock()
defer p.lock.Unlock()
pqlength := p.activeQ.data.Len()
return pqlength
}
func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, exists, err := p.activeQ.Get(qj)
if err != nil {
klog.Errorf("[IfExist] unable to check if app wrapper exists, - error:%#v", err)
}
if p.unschedulableQ.Get(qj) != nil || exists {
return true
}
return false
}
//used by queuejob_controller_ex.go
func (p *PriorityQueue) IfExistActiveQ(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
_, exists, err := p.activeQ.Get(qj)
if err != nil {
klog.Errorf("[IfExistActiveQ] unable to check if app wrapper exists, - error:%#v", err)
}
return exists
}
//used by queuejob_controller_ex.go
func (p *PriorityQueue) IfExistUnschedulableQ(qj *qjobv1.AppWrapper) bool {
p.lock.Lock()
defer p.lock.Unlock()
exists := p.unschedulableQ.Get(qj)
return (exists != nil)
}
// Move QJ from unschedulableQ to activeQ if exists
//used by queuejob_controller_ex.go
func (p *PriorityQueue) MoveToActiveQueueIfExists(aw *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.Get(aw) != nil {
p.unschedulableQ.Delete(aw)
err := p.activeQ.AddIfNotPresent(aw)
if err != nil {
klog.Errorf("[MoveToActiveQueueIfExists] Error adding AW %s/%s to the scheduling queue: %v\n", aw.Namespace, aw.Name, err)
}
p.cond.Broadcast()
return err
}
return nil
}
// Add adds a QJ to the active queue. It should be called only when a new QJ
// is added so there is no chance the QJ is already in either queue.
func (p *PriorityQueue) Add(qj *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
err := p.activeQ.Add(qj)
if err != nil {
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
} else {
if p.unschedulableQ.Get(qj) != nil {
klog.Errorf("Error: QJ %s/%s is already in the unschedulable queue.", qj.Namespace, qj.Name)
p.unschedulableQ.Delete(qj)
}
p.cond.Broadcast()
}
return err
}
// AddIfNotPresent adds a pod to the active queue if it is not present in any of
// the two queues. If it is present in any, it doesn't do any thing.
//used by queuejob_controller_ex.go
func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.Get(qj) != nil {
return nil
}
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[AddIfNotPresent] unable to check if pod exists, - error:%#v", err)
}
return nil
}
err := p.activeQ.Add(qj)
if err != nil {
klog.Errorf("[AddIfNotPresent] Error adding pod %s/%s to the scheduling queue, - error:%#v", qj.Namespace, qj.Name, err)
} else {
p.cond.Broadcast()
}
return err
}
// AddUnschedulableIfNotPresent does nothing if the pod is present in either
// queue. Otherwise it adds the pod to the unschedulable queue if
// p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.
//used by queuejob_controller_ex.go
func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.unschedulableQ.Get(qj) != nil {
return fmt.Errorf("pod is already present in unschedulableQ")
}
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[AddUnschedulableIfNotPresent] unable to check if pod exists, - error:%#v", err)
}
return fmt.Errorf("pod is already present in the activeQ")
}
// if !p.receivedMoveRequest && isPodUnschedulable(qj) {
if !p.receivedMoveRequest {
p.unschedulableQ.Add(qj)
return nil
}
err := p.activeQ.Add(qj)
if err != nil {
klog.Errorf("[AddUnschedulableIfNotPresent] Error adding QJ %s/%s to the scheduling queue: %v", qj.Namespace, qj.Name, err)
} else {
p.cond.Broadcast()
}
return err
}
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It also
// clears receivedMoveRequest to mark the beginning of a new scheduling cycle.
//used by queuejob_controller_ex.go
func (p *PriorityQueue) Pop() (*qjobv1.AppWrapper, error) {
p.lock.Lock()
defer p.lock.Unlock()
for len(p.activeQ.data.queue) == 0 {
p.cond.Wait()
}
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
qj := obj.(*qjobv1.AppWrapper)
p.receivedMoveRequest = false
return qj, err
}
// isPodUpdated checks if the pod is updated in a way that it may have become
// schedulable. It drops status of the pod and compares it with old version.
func (p *PriorityQueue) isQJUpdated(oldQJ, newQJ *qjobv1.AppWrapper) bool {
strip := func(qj *qjobv1.AppWrapper) *qjobv1.AppWrapper {
p := qj.DeepCopy()
p.ResourceVersion = ""
p.Generation = 0
return p
}
return !reflect.DeepEqual(strip(oldQJ), strip(newQJ))
}
// Update updates a pod in the active queue if present. Otherwise, it removes
// the item from the unschedulable queue and adds the updated one to the active
// queue.
func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there.
if _, exists, errp := p.activeQ.Get(newQJ); exists {
if errp != nil {
klog.Errorf("[Update] unable to check if pod exists, - error:%#v", errp)
}
err := p.activeQ.Update(newQJ)
if err != nil {
klog.Errorf("[Update] unable to update pod, - error: %#v", err)
}
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usQJ := p.unschedulableQ.Get(newQJ); usQJ != nil {
if p.isQJUpdated(oldQJ, newQJ) {
p.unschedulableQ.Delete(usQJ)
err := p.activeQ.Add(newQJ)
if err != nil {
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
} else {
p.cond.Broadcast()
}
return err
}
p.unschedulableQ.Update(newQJ)
return nil
}
// If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(newQJ)
if err != nil {
klog.Errorf("Error adding QJ %s/%s to the scheduling queue: %v", newQJ.Namespace, newQJ.Name, err)
} else {
p.cond.Broadcast()
}
return err
}
// Delete deletes the item from either of the two queues. It assumes the pod is
// only in one queue.
//used by queuejob_controller_ex.go
func (p *PriorityQueue) Delete(qj *qjobv1.AppWrapper) error {
p.lock.Lock()
defer p.lock.Unlock()
p.unschedulableQ.Delete(qj)
if _, exists, err := p.activeQ.Get(qj); exists {
if err != nil {
klog.Errorf("[Delete] unable to check if pod exists - error: %#v", err)
}
return p.activeQ.Delete(qj)
}
// p.unschedulableQ.Delete(qj)
return nil
}
// MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This
// function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives it after all the pods are in the
// queue and the head is the highest priority pod.
// TODO(bsalamat): We should add a back-off mechanism here so that a high priority
// pod which is unschedulable does not go to the head of the queue frequently. For
// example in a cluster where a lot of pods being deleted, such a high priority
// pod can deprive other pods from getting scheduled.
func (p *PriorityQueue) MoveAllToActiveQueue() {
p.lock.Lock()
defer p.lock.Unlock()
var unschedulableQJs []*arbv1.AppWrapper
for _, qj := range p.unschedulableQ.pods {
unschedulableQJs = append(unschedulableQJs, qj)
}
p.activeQ.BulkAdd(unschedulableQJs)
p.unschedulableQ.Clear()
p.receivedMoveRequest = true
p.cond.Broadcast()
}
// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulableQJMap struct {
// pods is a map key by a pod's full-name and the value is a pointer to the pod.
pods map[string]*qjobv1.AppWrapper
keyFunc func(*qjobv1.AppWrapper) string
}
type UnschedulableQueueJobs interface {
Add(pod *qjobv1.AppWrapper)
Delete(pod *qjobv1.AppWrapper)
Update(pod *qjobv1.AppWrapper)
Get(pod *qjobv1.AppWrapper) *qjobv1.AppWrapper
Clear()
}
var _ = UnschedulableQueueJobs(&UnschedulableQJMap{})
// Add adds a pod to the unschedulable pods.
func (u *UnschedulableQJMap) Add(pod *qjobv1.AppWrapper) {
podjkey := GetXQJFullName(pod)
if _, exists := u.pods[podjkey]; !exists {
u.pods[podjkey] = pod
}
}
// Delete deletes a pod from the unschedulable pods.
func (u *UnschedulableQJMap) Delete(pod *qjobv1.AppWrapper) {
podKey := GetXQJFullName(pod)
if _, exists := u.pods[podKey]; exists {
delete(u.pods, podKey)
}
}
// Update updates a pod in the unschedulable pods.
func (u *UnschedulableQJMap) Update(pod *qjobv1.AppWrapper) {
podKey := GetXQJFullName(pod)
_, exists := u.pods[podKey]
if !exists {
u.Add(pod)
return
}
u.pods[podKey] = pod
}
// Get returns the pod if a pod with the same key as the key of the given "pod"
// is found in the map. It returns nil otherwise.
func (u *UnschedulableQJMap) Get(pod *qjobv1.AppWrapper) *qjobv1.AppWrapper {
podKey := GetXQJFullName(pod)
if p, exists := u.pods[podKey]; exists {
return p
}
return nil
}
// Clear removes all the entries from the unschedulable maps.
func (u *UnschedulableQJMap) Clear() {
u.pods = make(map[string]*qjobv1.AppWrapper)
}
// newUnschedulablePodsMap initializes a new object of UnschedulablePodsMap.
func newUnschedulableQJMap() *UnschedulableQJMap {
return &UnschedulableQJMap{
pods: make(map[string]*qjobv1.AppWrapper),
keyFunc: GetXQJFullName,
}
}