@@ -19,7 +19,14 @@ import (
19
19
"github.com/pkg/errors"
20
20
"github.com/sirupsen/logrus"
21
21
authorizationv1 "k8s.io/api/authorization/v1"
22
+ v1 "k8s.io/api/core/v1"
22
23
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24
+ k8sruntime "k8s.io/apimachinery/pkg/runtime"
25
+ "k8s.io/apimachinery/pkg/watch"
26
+ typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
27
+ "k8s.io/client-go/tools/cache"
28
+ toolswatch "k8s.io/client-go/tools/watch"
29
+ cloudproviderapi "k8s.io/cloud-provider/api"
23
30
logsapi "k8s.io/component-base/logs/api/v1"
24
31
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
25
32
"k8s.io/kubernetes/pkg/registry/core/node"
@@ -157,8 +164,36 @@ func scheduler(ctx context.Context, cfg *config.Control) error {
157
164
158
165
args := config .GetArgs (argsMap , cfg .ExtraSchedulerAPIArgs )
159
166
167
+ schedulerNodeReady := make (chan struct {})
168
+
169
+ go func () {
170
+ defer close (schedulerNodeReady )
171
+
172
+ apiReadyLoop:
173
+ for {
174
+ select {
175
+ case <- ctx .Done ():
176
+ return
177
+ case <- cfg .Runtime .APIServerReady :
178
+ break apiReadyLoop
179
+ case <- time .After (30 * time .Second ):
180
+ logrus .Infof ("Waiting for API server to become available to start kube-scheduler" )
181
+ }
182
+ }
183
+
184
+ // If we're running the embedded cloud controller, wait for it to untaint at least one
185
+ // node (usually, the local node) before starting the scheduler to ensure that it
186
+ // finds a node that is ready to run pods during its initial scheduling loop.
187
+ if ! cfg .DisableCCM {
188
+ logrus .Infof ("Waiting for untainted node" )
189
+ if err := waitForUntaintedNode (ctx , runtime .KubeConfigScheduler ); err != nil {
190
+ logrus .Fatalf ("failed to wait for untained node: %v" , err )
191
+ }
192
+ }
193
+ }()
194
+
160
195
logrus .Infof ("Running kube-scheduler %s" , config .ArgString (args ))
161
- return executor .Scheduler (ctx , cfg . Runtime . APIServerReady , args )
196
+ return executor .Scheduler (ctx , schedulerNodeReady , args )
162
197
}
163
198
164
199
func apiServer (ctx context.Context , cfg * config.Control ) error {
@@ -323,7 +358,6 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
323
358
"authentication-kubeconfig" : runtime .KubeConfigCloudController ,
324
359
"node-status-update-frequency" : "1m0s" ,
325
360
"bind-address" : cfg .Loopback (false ),
326
- "feature-gates" : "CloudDualStackNodeIPs=true" ,
327
361
}
328
362
if cfg .NoLeaderElect {
329
363
argsMap ["leader-elect" ] = "false"
@@ -359,7 +393,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
359
393
case <- cfg .Runtime .APIServerReady :
360
394
break apiReadyLoop
361
395
case <- time .After (30 * time .Second ):
362
- logrus .Infof ("Waiting for API server to become available" )
396
+ logrus .Infof ("Waiting for API server to become available to start cloud-controller-manager " )
363
397
}
364
398
}
365
399
@@ -449,3 +483,50 @@ func promise(f func() error) <-chan error {
449
483
}()
450
484
return c
451
485
}
486
+
487
+ // waitForUntaintedNode watches nodes, waiting to find one not tainted as
488
+ // uninitialized by the external cloud provider.
489
+ func waitForUntaintedNode (ctx context.Context , kubeConfig string ) error {
490
+
491
+ restConfig , err := util .GetRESTConfig (kubeConfig )
492
+ if err != nil {
493
+ return err
494
+ }
495
+ coreClient , err := typedcorev1 .NewForConfig (restConfig )
496
+ if err != nil {
497
+ return err
498
+ }
499
+ nodes := coreClient .Nodes ()
500
+
501
+ lw := & cache.ListWatch {
502
+ ListFunc : func (options metav1.ListOptions ) (object k8sruntime.Object , e error ) {
503
+ return nodes .List (ctx , options )
504
+ },
505
+ WatchFunc : func (options metav1.ListOptions ) (i watch.Interface , e error ) {
506
+ return nodes .Watch (ctx , options )
507
+ },
508
+ }
509
+
510
+ condition := func (ev watch.Event ) (bool , error ) {
511
+ if node , ok := ev .Object .(* v1.Node ); ok {
512
+ return getCloudTaint (node .Spec .Taints ) == nil , nil
513
+ }
514
+ return false , errors .New ("event object not of type v1.Node" )
515
+ }
516
+
517
+ if _ , err := toolswatch .UntilWithSync (ctx , lw , & v1.Node {}, nil , condition ); err != nil {
518
+ return errors .Wrap (err , "failed to wait for untainted node" )
519
+ }
520
+ return nil
521
+ }
522
+
523
+ // getCloudTaint returns the external cloud provider taint, if present.
524
+ // Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go
525
+ func getCloudTaint (taints []v1.Taint ) * v1.Taint {
526
+ for _ , taint := range taints {
527
+ if taint .Key == cloudproviderapi .TaintExternalCloudProvider {
528
+ return & taint
529
+ }
530
+ }
531
+ return nil
532
+ }
0 commit comments