Skip to content

Commit 5f345f3

Browse files
authored
fix: apiserver watcher refresh shouldn't crash plugin manager (#576)
# Description This PR modifies the APIServer watcher to improve error handling and optimize performance: 1. **Error Handling**: Instead of returning a resolution error to the caller, watcher now retries the host lookup, after 3 refresh, if we still can't resolve the apiserver IP, we bail and return the error to the watcher manager. 2. **Optimization**: Moved the hostname retrieval to the initialization function. This change ensures that the hostname is obtained once during initialization and not on every refresh, as the hostname does not change. ## Related Issue [#565](#565) If this pull request is related to any issue, please mention it here. Additionally, make sure that the issue is assigned to you before submitting this pull request. ## Checklist - [x] I have read the [contributing documentation](https://retina.sh/docs/contributing). - [x] I signed and signed-off the commits (`git commit -S -s ...`). See [this documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification) on signing commits. - [x] I have correctly attributed the author(s) of the code. - [x] I have tested the changes locally. - [x] I have followed the project's style guidelines. - [x] I have updated the documentation, if necessary. - [ ] I have added tests, if applicable. ## Screenshots (if applicable) or Testing Completed Please add any relevant screenshots or GIFs to showcase the changes made. ## Additional Notes Add any additional notes or context about the pull request here. --- Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more information on how to contribute to this project.
1 parent 06d9807 commit 5f345f3

File tree

5 files changed

+177
-114
lines changed

5 files changed

+177
-114
lines changed

pkg/managers/pluginmanager/pluginmanager_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ func TestNewManagerStart(t *testing.T) {
120120

121121
for _, tt := range tests {
122122
mgr, err := NewPluginManager(tt.cfg, tel, api.PluginName(tt.pluginName))
123+
mgr.watcherManager = setupWatcherManagerMock(gomock.NewController(t))
123124
require.Nil(t, err, "Expected nil but got error:%w", err)
124125
require.NotNil(t, mgr, "Expected mgr to be intialized but found nil")
125126
require.Condition(t, assert.Comparison(func() bool {

pkg/managers/watchermanager/watchermanager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (wm *WatcherManager) Start(ctx context.Context) error {
3636

3737
for _, w := range wm.Watchers {
3838
if err := w.Init(ctx); err != nil {
39-
wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)))
39+
wm.l.Error("init failed", zap.String("watcher_type", fmt.Sprintf("%T", w)), zap.Error(err))
4040
return err
4141
}
4242
wm.wg.Add(1)

pkg/managers/watchermanager/watchermanager_test.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,28 @@ import (
1414
"golang.org/x/sync/errgroup"
1515
)
1616

17+
var errInitFailed = errors.New("init failed")
18+
1719
func TestStopWatcherManagerGracefully(t *testing.T) {
1820
ctl := gomock.NewController(t)
1921
defer ctl.Finish()
2022
log.SetupZapLogger(log.GetDefaultLogOpts())
2123
mgr := NewWatcherManager()
2224

25+
mockAPIServerWatcher := mock.NewMockIWatcher(ctl)
26+
mockEndpointWatcher := mock.NewMockIWatcher(ctl)
27+
28+
mgr.Watchers = []IWatcher{
29+
mockEndpointWatcher,
30+
mockAPIServerWatcher,
31+
}
32+
33+
mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes()
34+
mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes()
35+
36+
mockEndpointWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes()
37+
mockAPIServerWatcher.EXPECT().Stop(gomock.Any()).Return(nil).AnyTimes()
38+
2339
ctx, _ := context.WithCancel(context.Background())
2440
g, errctx := errgroup.WithContext(ctx)
2541

@@ -37,17 +53,17 @@ func TestWatcherInitFailsGracefully(t *testing.T) {
3753
defer ctl.Finish()
3854
log.SetupZapLogger(log.GetDefaultLogOpts())
3955

40-
mockApiServerWatcher := mock.NewMockIWatcher(ctl)
56+
mockAPIServerWatcher := mock.NewMockIWatcher(ctl)
4157
mockEndpointWatcher := mock.NewMockIWatcher(ctl)
4258

4359
mgr := NewWatcherManager()
4460
mgr.Watchers = []IWatcher{
45-
mockApiServerWatcher,
61+
mockAPIServerWatcher,
4662
mockEndpointWatcher,
4763
}
4864

49-
mockApiServerWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes()
50-
mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errors.New("error")).AnyTimes()
65+
mockAPIServerWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes()
66+
mockEndpointWatcher.EXPECT().Init(gomock.Any()).Return(errInitFailed).AnyTimes()
5167

5268
err := mgr.Start(context.Background())
5369
require.NotNil(t, err, "Expected error when starting watcher manager")

pkg/watchers/apiserver/apiserver.go

+96-95
Original file line numberDiff line numberDiff line change
@@ -15,34 +15,37 @@ import (
1515
"github.com/microsoft/retina/pkg/log"
1616
fm "github.com/microsoft/retina/pkg/managers/filtermanager"
1717
"github.com/microsoft/retina/pkg/pubsub"
18+
"github.com/microsoft/retina/pkg/utils"
1819
"go.uber.org/zap"
20+
"k8s.io/client-go/rest"
1921
kcfg "sigs.k8s.io/controller-runtime/pkg/client/config"
2022
)
2123

2224
const (
2325
filterManagerRetries = 3
26+
hostLookupRetries = 6 // 6 retries for a total of 63 seconds.
2427
)
2528

2629
type ApiServerWatcher struct {
27-
isRunning bool
28-
l *log.ZapLogger
29-
current cache
30-
new cache
31-
apiServerUrl string
32-
hostResolver IHostResolver
33-
filtermanager fm.IFilterManager
30+
isRunning bool
31+
l *log.ZapLogger
32+
current cache
33+
new cache
34+
apiServerHostName string
35+
hostResolver IHostResolver
36+
filterManager fm.IFilterManager
37+
restConfig *rest.Config
3438
}
3539

3640
var a *ApiServerWatcher
3741

38-
// Watcher creates a new apiserver watcher.
42+
// Watcher creates a new ApiServerWatcher instance.
3943
func Watcher() *ApiServerWatcher {
4044
if a == nil {
4145
a = &ApiServerWatcher{
4246
isRunning: false,
4347
l: log.Logger().Named("apiserver-watcher"),
4448
current: make(cache),
45-
apiServerUrl: getHostURL(),
4649
hostResolver: net.DefaultResolver,
4750
}
4851
}
@@ -56,12 +59,39 @@ func (a *ApiServerWatcher) Init(ctx context.Context) error {
5659
return nil
5760
}
5861

59-
a.filtermanager = getFilterManager()
62+
// Get filter manager.
63+
if a.filterManager == nil {
64+
var err error
65+
a.filterManager, err = fm.Init(filterManagerRetries)
66+
if err != nil {
67+
a.l.Error("failed to init filter manager", zap.Error(err))
68+
return fmt.Errorf("failed to init filter manager: %w", err)
69+
}
70+
}
71+
72+
// Get kubeconfig.
73+
if a.restConfig == nil {
74+
config, err := kcfg.GetConfig()
75+
if err != nil {
76+
a.l.Error("failed to get kubeconfig", zap.Error(err))
77+
return fmt.Errorf("failed to get kubeconfig: %w", err)
78+
}
79+
a.restConfig = config
80+
}
81+
82+
hostName, err := a.getHostName()
83+
if err != nil {
84+
a.l.Error("failed to get host name", zap.Error(err))
85+
return fmt.Errorf("failed to get host name: %w", err)
86+
}
87+
a.apiServerHostName = hostName
88+
6089
a.isRunning = true
90+
6191
return nil
6292
}
6393

64-
// Stop the apiserver watcher.
94+
// Stop stops the ApiServerWatcher.
6595
func (a *ApiServerWatcher) Stop(ctx context.Context) error {
6696
if !a.isRunning {
6797
a.l.Info("apiserver watcher is not running")
@@ -74,61 +104,57 @@ func (a *ApiServerWatcher) Stop(ctx context.Context) error {
74104
func (a *ApiServerWatcher) Refresh(ctx context.Context) error {
75105
err := a.initNewCache(ctx)
76106
if err != nil {
107+
a.l.Error("failed to initialize new cache", zap.Error(err))
77108
return err
78109
}
79-
// Compare the new ips with the old ones.
110+
111+
// Compare the new IPs with the old ones.
80112
created, deleted := a.diffCache()
81113

82-
// Publish the new ips.
83-
createdIps := []net.IP{}
84-
deletedIps := []net.IP{}
114+
createdIPs := []net.IP{}
115+
deletedIPs := []net.IP{}
85116

86117
for _, v := range created {
87-
a.l.Info("New Apiserver ips:", zap.Any("ip", v))
118+
a.l.Info("New Apiserver IPs:", zap.Any("ip", v))
88119
ip := net.ParseIP(v.(string)).To4()
89-
createdIps = append(createdIps, ip)
120+
createdIPs = append(createdIPs, ip)
90121
}
91122

92123
for _, v := range deleted {
93-
a.l.Info("Deleted Apiserver ips:", zap.Any("ip", v))
124+
a.l.Info("Deleted Apiserver IPs:", zap.Any("ip", v))
94125
ip := net.ParseIP(v.(string)).To4()
95-
deletedIps = append(deletedIps, ip)
126+
deletedIPs = append(deletedIPs, ip)
96127
}
97128

98-
if len(createdIps) > 0 {
99-
// Publish the new ips.
100-
a.publish(createdIps, cc.EventTypeAddAPIServerIPs)
101-
// Add ips to filter manager if any.
102-
err := a.filtermanager.AddIPs(createdIps, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"})
129+
if len(createdIPs) > 0 {
130+
a.publish(createdIPs, cc.EventTypeAddAPIServerIPs)
131+
err := a.filterManager.AddIPs(createdIPs, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"})
103132
if err != nil {
104-
a.l.Error("Failed to add ips to filter manager", zap.Error(err))
133+
a.l.Error("Failed to add IPs to filter manager", zap.Error(err))
105134
}
106135
}
107136

108-
if len(deletedIps) > 0 {
109-
// Publish the deleted ips.
110-
a.publish(deletedIps, cc.EventTypeDeleteAPIServerIPs)
111-
// Delete ips from filter manager if any.
112-
err := a.filtermanager.DeleteIPs(deletedIps, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"})
137+
if len(deletedIPs) > 0 {
138+
a.publish(deletedIPs, cc.EventTypeDeleteAPIServerIPs)
139+
err := a.filterManager.DeleteIPs(deletedIPs, "apiserver-watcher", fm.RequestMetadata{RuleID: "apiserver-watcher"})
113140
if err != nil {
114-
a.l.Error("Failed to delete ips from filter manager", zap.Error(err))
141+
a.l.Error("Failed to delete IPs from filter manager", zap.Error(err))
115142
}
116143
}
117144

118-
// update the current cache and reset the new cache
119145
a.current = a.new.deepcopy()
120146
a.new = nil
121147

122148
return nil
123149
}
124150

125151
func (a *ApiServerWatcher) initNewCache(ctx context.Context) error {
126-
ips, err := a.getApiServerIPs(ctx)
152+
ips, err := a.resolveIPs(ctx, a.apiServerHostName)
127153
if err != nil {
128-
return err
154+
return fmt.Errorf("failed to resolve IPs: %w", err)
129155
}
130156

131-
// Reset the new cache.
157+
// Reset new cache.
132158
a.new = make(cache)
133159
for _, ip := range ips {
134160
a.new[ip] = struct{}{}
@@ -137,14 +163,14 @@ func (a *ApiServerWatcher) initNewCache(ctx context.Context) error {
137163
}
138164

139165
func (a *ApiServerWatcher) diffCache() (created, deleted []interface{}) {
140-
// check if there are new ips
166+
// Check if there are any new IPs.
141167
for k := range a.new {
142168
if _, ok := a.current[k]; !ok {
143169
created = append(created, k)
144170
}
145171
}
146172

147-
// check if there are deleted ips
173+
// Check if there are any deleted IPs.
148174
for k := range a.current {
149175
if _, ok := a.new[k]; !ok {
150176
deleted = append(deleted, k)
@@ -153,53 +179,35 @@ func (a *ApiServerWatcher) diffCache() (created, deleted []interface{}) {
153179
return
154180
}
155181

156-
func (a *ApiServerWatcher) getApiServerIPs(ctx context.Context) ([]string, error) {
157-
// Parse the URL
158-
host, err := a.retrieveApiServerHostname()
159-
if err != nil {
160-
return nil, err
161-
}
162-
163-
// Get the ips for the host
164-
ips, err := a.resolveIPs(ctx, host)
165-
if err != nil {
166-
return nil, err
167-
}
168-
169-
return ips, nil
170-
}
171-
172-
// parse url to extract hostname
173-
func (a *ApiServerWatcher) retrieveApiServerHostname() (string, error) {
174-
// Parse the URL
175-
url, err := url.Parse(a.apiServerUrl)
176-
if err != nil {
177-
fmt.Println("Failed to parse URL:", err)
178-
return "", err
179-
}
180-
181-
// Remove the scheme (http:// or https://) and port from the host
182-
host := strings.TrimPrefix(url.Host, "www.")
183-
colonIndex := strings.IndexByte(host, ':')
184-
if colonIndex != -1 {
185-
host = host[:colonIndex]
182+
func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]string, error) {
183+
// perform a DNS lookup for the host URL using the net.DefaultResolver which uses the local resolver.
184+
// Possible errors here are:
185+
// - Canceled context: The context was canceled before the lookup completed.
186+
// -DNS server errors ie NXDOMAIN, SERVFAIL.
187+
// - Network errors ie timeout, unreachable DNS server.
188+
// -Other DNS-related errors encapsulated in a DNSError.
189+
var hostIPs []string
190+
var err error
191+
192+
retryFunc := func() error {
193+
hostIPs, err = a.hostResolver.LookupHost(ctx, host)
194+
if err != nil {
195+
return fmt.Errorf("APIServer LookupHost failed: %w", err)
196+
}
197+
return nil
186198
}
187-
return host, nil
188-
}
189199

190-
// Resolve the list of ips for the given host
191-
func (a *ApiServerWatcher) resolveIPs(ctx context.Context, host string) ([]string, error) {
192-
hostIps, err := a.hostResolver.LookupHost(ctx, host)
200+
// Retry the lookup for hostIPs in case of failure.
201+
err = utils.Retry(retryFunc, hostLookupRetries)
193202
if err != nil {
194203
return nil, err
195204
}
196205

197-
if len(hostIps) == 0 {
198-
a.l.Error("no ips found for host", zap.String("host", host))
199-
return nil, fmt.Errorf("no ips found for host %s", host)
206+
if len(hostIPs) == 0 {
207+
a.l.Debug("no IPs found for host", zap.String("host", host))
200208
}
201209

202-
return hostIps, nil
210+
return hostIPs, nil
203211
}
204212

205213
func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) {
@@ -212,30 +220,23 @@ func (a *ApiServerWatcher) publish(netIPs []net.IP, eventType cc.EventType) {
212220
ipsToPublish = append(ipsToPublish, ip.String())
213221
}
214222
ps := pubsub.New()
215-
ps.Publish(common.PubSubAPIServer,
216-
cc.NewCacheEvent(
217-
eventType,
218-
common.NewAPIServerObject(ipsToPublish),
219-
),
220-
)
223+
ps.Publish(common.PubSubAPIServer, cc.NewCacheEvent(eventType, common.NewAPIServerObject(ipsToPublish)))
221224
a.l.Debug("Published event", zap.Any("eventType", eventType), zap.Any("netIPs", ipsToPublish))
222225
}
223226

224-
// getHostURL returns the host url from the config.
225-
func getHostURL() string {
226-
config, err := kcfg.GetConfig()
227+
func (a *ApiServerWatcher) getHostName() (string, error) {
228+
// Parse the host URL.
229+
hostURL := a.restConfig.Host
230+
parsedURL, err := url.ParseRequestURI(hostURL)
227231
if err != nil {
228-
log.Logger().Error("failed to get config", zap.Error(err))
229-
return ""
232+
log.Logger().Error("failed to parse URL", zap.String("url", hostURL), zap.Error(err))
233+
return "", fmt.Errorf("failed to parse URL: %w", err)
230234
}
231-
return config.Host
232-
}
233235

234-
// Get FilterManager
235-
func getFilterManager() *fm.FilterManager {
236-
f, err := fm.Init(filterManagerRetries)
237-
if err != nil {
238-
a.l.Error("failed to init filter manager", zap.Error(err))
236+
// Extract the host name from the URL.
237+
host := strings.TrimPrefix(parsedURL.Host, "www.")
238+
if colonIndex := strings.IndexByte(host, ':'); colonIndex != -1 {
239+
host = host[:colonIndex]
239240
}
240-
return f
241+
return host, nil
241242
}

0 commit comments

Comments
 (0)