Skip to content

Commit a82ca18

Browse files
committed
Implement a (broken) informer/watcher
1 parent 27c825c commit a82ca18

File tree

3 files changed

+95
-10
lines changed

3 files changed

+95
-10
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
.glide/
1515

1616
/kubeconfig
17-
/main
17+
/tgik-controller
1818

1919
/bin
2020
/.go

controller.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"k8s.io/apimachinery/pkg/util/runtime"
7+
"k8s.io/client-go/informers"
8+
"k8s.io/client-go/kubernetes"
9+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
10+
listercorev1 "k8s.io/client-go/listers/core/v1"
11+
"k8s.io/client-go/tools/cache"
12+
)
13+
14+
type TGIKController struct {
15+
podGetter corev1.PodsGetter
16+
podLister listercorev1.PodLister
17+
podListerSynced cache.InformerSynced
18+
}
19+
20+
func NewTGIKController(client *kubernetes.Clientset, informerFactory informers.SharedInformerFactory) *TGIKController {
21+
informer := informerFactory.Core().V1().Pods()
22+
c := &TGIKController{
23+
podGetter: client.CoreV1(),
24+
podLister: informer.Lister(),
25+
podListerSynced: informer.Informer().HasSynced,
26+
}
27+
28+
informer.Informer().AddEventHandler(
29+
cache.ResourceEventHandlerFuncs{
30+
AddFunc: func(obj interface{}) {
31+
c.onAdd(obj)
32+
},
33+
UpdateFunc: func(oldObj, newObj interface{}) {
34+
c.onUpdate(oldObj, newObj)
35+
},
36+
DeleteFunc: func(obj interface{}) {
37+
c.onDelete(obj)
38+
},
39+
},
40+
)
41+
42+
return c
43+
}
44+
45+
func (c *TGIKController) Run(stop <-chan struct{}) {
46+
log.Print("waiting for cache sync")
47+
if !cache.WaitForCacheSync(stop, c.podListerSynced) {
48+
log.Print("timed out waiting for cache sync")
49+
return
50+
}
51+
log.Print("caches are synced")
52+
53+
// wait until we're told to stop
54+
log.Print("waiting for stop signal")
55+
<-stop
56+
log.Print("received stop signal")
57+
}
58+
59+
func (c *TGIKController) onAdd(obj interface{}) {
60+
key, err := cache.MetaNamespaceKeyFunc(obj)
61+
if err != nil {
62+
log.Printf("onAdd: error getting key for %#v: %v", obj, err)
63+
runtime.HandleError(err)
64+
}
65+
log.Printf("onAdd: %v", key)
66+
}
67+
68+
func (c *TGIKController) onUpdate(oldObj, _ interface{}) {
69+
key, err := cache.MetaNamespaceKeyFunc(oldObj)
70+
if err != nil {
71+
log.Printf("onUpdate: error getting key for %#v: %v", oldObj, err)
72+
runtime.HandleError(err)
73+
}
74+
log.Printf("onUpdate: %v", key)
75+
}
76+
77+
func (c *TGIKController) onDelete(obj interface{}) {
78+
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
79+
if err != nil {
80+
runtime.HandleError(err)
81+
}
82+
log.Printf("onDelete: %v", key)
83+
}

tgik-controller.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,20 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"log"
67
"os"
8+
"time"
79

8-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"github.com/jbeda/tgik-controller/version"
11+
12+
"k8s.io/client-go/informers"
913
"k8s.io/client-go/kubernetes"
1014
"k8s.io/client-go/rest"
1115
"k8s.io/client-go/tools/clientcmd"
1216
)
1317

1418
func main() {
19+
log.Printf("tgik-controller version %s", version.VERSION)
1520
kubeconfig := ""
1621
flag.StringVar(&kubeconfig, "kubeconfig", kubeconfig, "kubeconfig file")
1722
flag.Parse()
@@ -32,12 +37,9 @@ func main() {
3237
os.Exit(1)
3338
}
3439
client := kubernetes.NewForConfigOrDie(config)
35-
list, err := client.CoreV1().Nodes().List(metav1.ListOptions{})
36-
if err != nil {
37-
fmt.Fprintf(os.Stderr, "error listing nodes: %v", err)
38-
os.Exit(1)
39-
}
40-
for _, node := range list.Items {
41-
fmt.Printf("Node: %s\n", node.Name)
42-
}
40+
41+
sharedInformers := informers.NewSharedInformerFactory(client, 10*time.Minute)
42+
43+
tgikController := NewTGIKController(client, sharedInformers)
44+
tgikController.Run(nil)
4345
}

0 commit comments

Comments
 (0)