1
1
diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go
2
- index 198fdfb37..d534fc1ef 100644
2
+ index b9be94b6e..5efb40df2 100644
3
3
--- a/cluster-autoscaler/utils/kubernetes/listers.go
4
4
+++ b/cluster-autoscaler/utils/kubernetes/listers.go
5
- @@ -17,14 +17,19 @@ limitations under the License.
5
+ @@ -17,10 +17,12 @@ limitations under the License.
6
6
package kubernetes
7
7
8
8
import (
9
9
+ "encoding/json"
10
10
"time"
11
11
12
- appsv1 "k8s.io/api/apps/v1"
13
- batchv1 "k8s.io/api/batch/v1"
14
12
apiv1 "k8s.io/api/core/v1"
15
13
policyv1 "k8s.io/api/policy/v1"
16
14
+ "k8s.io/apimachinery/pkg/api/resource"
17
- + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
15
"k8s.io/apimachinery/pkg/fields"
19
16
"k8s.io/apimachinery/pkg/labels"
20
- + "k8s.io/apimachinery/pkg/runtime"
21
- + "k8s.io/apimachinery/pkg/watch"
22
- client "k8s.io/client-go/kubernetes"
23
- v1appslister "k8s.io/client-go/listers/apps/v1"
24
- v1batchlister "k8s.io/client-go/listers/batch/v1"
25
- @@ -169,6 +174,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
26
- selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
27
- string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
28
- podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
29
- + podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
30
- store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
31
- podLister := v1lister.NewPodLister(store)
32
- go reflector.Run(stopchannel)
33
- @@ -212,6 +218,7 @@ func NewScheduledAndUnschedulablePodLister(kubeClient client.Interface, stopchan
34
- selector := fields.ParseSelectorOrDie("status.phase!=" +
35
- string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
36
- podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
37
- + podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
38
- store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
39
- podLister := v1lister.NewPodLister(store)
40
- go reflector.Run(stopchannel)
41
- @@ -221,6 +228,105 @@ func NewScheduledAndUnschedulablePodLister(kubeClient client.Interface, stopchan
42
- }
17
+ "k8s.io/client-go/informers"
18
+ @@ -46,6 +48,14 @@ type ListerRegistry interface {
19
+ StatefulSetLister() v1appslister.StatefulSetLister
43
20
}
44
21
45
22
+ // copied from github.com/neondatabase/autoscaling, neonvm/apis/neonvm/v1/virtualmachine_types.go.
@@ -50,97 +27,41 @@ index 198fdfb37..d534fc1ef 100644
50
27
+ Memory resource.Quantity `json:"memory"`
51
28
+ }
52
29
+
53
- + func wrapListWatchWithNeonVMUsage(lw *cache.ListWatch) *cache.ListWatch {
54
- + updatePodRequestsFromNeonVMAnnotation := func(pod *apiv1.Pod) {
55
- + annotation, ok := pod.Annotations["vm.neon.tech/usage"]
56
- + if !ok {
57
- + return
58
- + }
59
- +
60
- + var usage virtualMachineUsage
61
- + if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
62
- + return
63
- + }
64
- +
65
- + pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
66
- + apiv1.ResourceCPU: usage.CPU,
67
- + apiv1.ResourceMemory: usage.Memory,
68
- + })
30
+ type listerRegistryImpl struct {
31
+ allNodeLister NodeLister
32
+ readyNodeLister NodeLister
33
+ @@ -221,6 +231,22 @@ type AllPodLister struct {
34
+ podLister v1lister.PodLister
35
+ }
36
+
37
+ + func updatePodRequestsFromNeonVMAnnotation(pod *apiv1.Pod) {
38
+ + annotation, ok := pod.Annotations["vm.neon.tech/usage"]
39
+ + if !ok {
40
+ + return
69
41
+ }
70
42
+
71
- + return &cache.ListWatch{
72
- + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
73
- + obj, err := lw.List(options)
74
- + if err != nil {
75
- + return obj, err
76
- + }
77
- +
78
- + list := obj.(*apiv1.PodList)
79
- + for i := range list.Items {
80
- + updatePodRequestsFromNeonVMAnnotation(&list.Items[i])
81
- + }
82
- + return obj, nil
83
- + },
84
- + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
85
- + iface, err := lw.Watch(options)
86
- + if err != nil {
87
- + return iface, err
88
- + }
89
- +
90
- + // Wrap the channel to update the pods as they come through
91
- + wrappedEvents := make(chan watch.Event)
92
- + proxyIface := watch.NewProxyWatcher(wrappedEvents)
93
- +
94
- + go func() {
95
- + events := iface.ResultChan()
96
- +
97
- + for {
98
- + var ok bool
99
- + var ev watch.Event
100
- +
101
- + select {
102
- + case <-proxyIface.StopChan():
103
- + return
104
- + case ev, ok = <-events:
105
- + if !ok {
106
- + close(wrappedEvents)
107
- + return
108
- + }
109
- + }
110
- +
111
- + // Quoting the docs on watch.Event.Object:
112
- + //
113
- + // > Object is:
114
- + // > * If Type is Added or Modified: the new state of the object
115
- + // > * If type is Deleted: the state of the object immediately before deletion.
116
- + // > * If Type is Bookmark: the object [ ... ] where only ResourceVersion field
117
- + // > is set.
118
- + // > * If Type is Error: *api.Status is recommended; other types may make sense
119
- + // > depending on context.
120
- + //
121
- + // So basically, we want to process the object only if ev.Type is Added,
122
- + // Modified, or Deleted.
123
- + if ev.Type == watch.Added || ev.Type == watch.Modified || ev.Type == watch.Deleted {
124
- + pod := ev.Object.(*apiv1.Pod)
125
- + updatePodRequestsFromNeonVMAnnotation(pod)
126
- + }
127
- +
128
- + // Pass along the maybe-updated event
129
- + select {
130
- + case <-proxyIface.StopChan():
131
- + return
132
- + case wrappedEvents <- ev:
133
- + // continue on to next event
134
- + }
135
- + }
136
- + }()
137
- +
138
- + return proxyIface, nil
139
- + },
140
- + DisableChunking: lw.DisableChunking,
43
+ + var usage virtualMachineUsage
44
+ + if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
45
+ + return
141
46
+ }
47
+ + pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
48
+ + apiv1.ResourceCPU: usage.CPU,
49
+ + apiv1.ResourceMemory: usage.Memory,
50
+ + })
142
51
+ }
143
52
+
144
- // NodeLister lists nodes.
145
- type NodeLister interface {
146
- List() ([]*apiv1.Node, error)
53
+ // List returns all scheduled pods.
54
+ func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
55
+ var pods []*apiv1.Pod
56
+ @@ -231,7 +257,10 @@ func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
57
+ }
58
+ for _, p := range allPods {
59
+ if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed {
60
+ - pods = append(pods, p)
61
+ + // We need to make a copy of the pod to avoid modifying the original pod, since *p is a pointer to the object in the informer cache.
62
+ + podCopy := p.DeepCopy()
63
+ + updatePodRequestsFromNeonVMAnnotation(podCopy)
64
+ + pods = append(pods, podCopy)
65
+ }
66
+ }
67
+ return pods, nil
0 commit comments