Skip to content

Commit 7cac3dd

Browse files
committed
operator: Add tests for ConcurrentWatcher
Signed-off-by: Prem Saraswat <[email protected]>
1 parent 7c3618e commit 7cac3dd

5 files changed

+368
-10
lines changed

Diff for: operator/concurrentwatcher.go

+16-4
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,31 @@ type ConcurrentWatcher struct {
4242
// wrapped ResourceWatcher methods will not be called.
4343
func NewConcurrentWatcher(
4444
watcher ResourceWatcher, initialPoolSize uint64, errorHandler func(context.Context, error),
45-
) *ConcurrentWatcher {
45+
) (*ConcurrentWatcher, error) {
46+
if watcher == nil {
47+
return nil, fmt.Errorf("resource watcher cannot be nil")
48+
}
49+
if initialPoolSize <= 0 {
50+
return nil, fmt.Errorf("initial worker pool size needs to be greater than 0")
51+
}
52+
4653
cw := &ConcurrentWatcher{
4754
watcher: watcher,
4855
size: initialPoolSize,
49-
workers: make(map[uint64]*bufferedQueue),
50-
errorHandler: errorHandler,
56+
workers: make(map[uint64]*bufferedQueue, initialBufferSize),
57+
errorHandler: DefaultErrorHandler,
58+
}
59+
if errorHandler != nil {
60+
cw.errorHandler = errorHandler
5161
}
5262

5363
var i uint64
5464
for i < initialPoolSize {
5565
cw.workers[i] = newBufferedQueue(initialBufferSize)
66+
i++
5667
}
5768

58-
return cw
69+
return cw, nil
5970
}
6071

6172
func (w *ConcurrentWatcher) Add(ctx context.Context, object resource.Object) error {
@@ -91,6 +102,7 @@ func (w *ConcurrentWatcher) Delete(ctx context.Context, object resource.Object)
91102

92103
// Run starts a number of workers, processing the events concurrently by triggering the
93104
// methods of underlying watcher as per the event type.
105+
// Run will clean up and exit once the provided context is canceled.
94106
func (w *ConcurrentWatcher) Run(ctx context.Context) {
95107
var wg sync.WaitGroup
96108
for _, queue := range w.workers {

Diff for: operator/concurrentwatcher_test.go

+343
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
package operator
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
13+
"github.com/grafana/grafana-app-sdk/resource"
14+
)
15+
16+
func TestNewConcurrentWatcher(t *testing.T) {
17+
t.Run("nil args", func(t *testing.T) {
18+
cw, err := NewConcurrentWatcher(nil, 0, nil)
19+
assert.Nil(t, cw)
20+
assert.EqualError(t, err, "resource watcher cannot be nil")
21+
22+
cw, err = NewConcurrentWatcher(&SimpleWatcher{}, 0, nil)
23+
assert.Nil(t, cw)
24+
assert.EqualError(t, err, "initial worker pool size needs to be greater than 0")
25+
26+
// In case of a nil errorHandler, we create a ConcurrentWatcher with DefaultErrorHandler
27+
cw, err = NewConcurrentWatcher(&SimpleWatcher{}, 1, nil)
28+
assert.NoError(t, err)
29+
assert.NotNil(t, cw)
30+
})
31+
32+
t.Run("success", func(t *testing.T) {
33+
var size uint64 = 2
34+
cw, err := NewConcurrentWatcher(&SimpleWatcher{}, size, DefaultErrorHandler)
35+
assert.NoError(t, err)
36+
assert.NotNil(t, cw)
37+
assert.Len(t, cw.workers, int(size))
38+
})
39+
}
40+
41+
func TestConcurrentWatcher_Add(t *testing.T) {
42+
ex := &resource.TypedSpecObject[string]{}
43+
schema := resource.NewSimpleSchema("group", "version", ex, &resource.TypedList[*resource.TypedSpecObject[string]]{})
44+
45+
t.Run("successful add with single worker", func(t *testing.T) {
46+
mock := &mockWatcher{}
47+
var errCount atomic.Int64
48+
cw, err := NewConcurrentWatcher(mock, 1, func(ctx context.Context, err error) { errCount.Add(1) })
49+
assert.Nil(t, err)
50+
go cw.Run(t.Context())
51+
obj := schema.ZeroValue()
52+
err = cw.Add(t.Context(), obj)
53+
assert.Nil(t, err)
54+
// this should be enough for the workers to process the event from queue.
55+
time.Sleep(500 * time.Millisecond)
56+
assert.Equal(t, int64(1), mock.addAttempts.Load())
57+
assert.Equal(t, int64(0), errCount.Load())
58+
})
59+
60+
t.Run("error handler should be called in case of an error", func(t *testing.T) {
61+
mock := &mockWatcher{}
62+
mock.AddFunc = func(ctx context.Context, o resource.Object) error {
63+
return fmt.Errorf("IT'S-A ME, ERRORIO!")
64+
}
65+
var errCount atomic.Int64
66+
cw, err := NewConcurrentWatcher(mock, 1, func(ctx context.Context, err error) { errCount.Add(1) })
67+
assert.Nil(t, err)
68+
go cw.Run(t.Context())
69+
obj := schema.ZeroValue()
70+
err = cw.Add(t.Context(), obj)
71+
assert.Nil(t, err)
72+
// this should be enough for the workers to process the event from queue.
73+
time.Sleep(500 * time.Millisecond)
74+
assert.Equal(t, int64(1), mock.addAttempts.Load())
75+
assert.Equal(t, int64(1), errCount.Load())
76+
})
77+
78+
t.Run("successful adds with multiple workers", func(t *testing.T) {
79+
mock := &mockWatcher{}
80+
var errCount atomic.Int64
81+
cw, err := NewConcurrentWatcher(mock, 3, func(ctx context.Context, err error) { errCount.Add(1) })
82+
assert.Nil(t, err)
83+
go cw.Run(t.Context())
84+
obj1 := schema.ZeroValue()
85+
obj1.SetName("one")
86+
obj2 := schema.ZeroValue()
87+
obj2.SetName("two")
88+
obj3 := schema.ZeroValue()
89+
obj3.SetName("three")
90+
err = cw.Add(t.Context(), obj1)
91+
assert.Nil(t, err)
92+
err = cw.Add(t.Context(), obj2)
93+
assert.Nil(t, err)
94+
err = cw.Add(t.Context(), obj3)
95+
assert.Nil(t, err)
96+
// this should be enough for the workers to process the event from queue.
97+
time.Sleep(500 * time.Millisecond)
98+
assert.Equal(t, int64(3), mock.addAttempts.Load())
99+
assert.Equal(t, int64(0), errCount.Load())
100+
})
101+
}
102+
103+
func TestConcurrentWatcher_Update(t *testing.T) {
104+
ex := &resource.TypedSpecObject[string]{}
105+
schema := resource.NewSimpleSchema("group", "version", ex, &resource.TypedList[*resource.TypedSpecObject[string]]{})
106+
107+
t.Run("successful update with single worker", func(t *testing.T) {
108+
mock := &mockWatcher{}
109+
var errCount atomic.Int64
110+
cw, err := NewConcurrentWatcher(mock, 1, func(ctx context.Context, err error) { errCount.Add(1) })
111+
assert.Nil(t, err)
112+
go cw.Run(t.Context())
113+
obj := schema.ZeroValue()
114+
err = cw.Update(t.Context(), obj, obj)
115+
assert.Nil(t, err)
116+
// this should be enough for the workers to process the event from queue.
117+
time.Sleep(500 * time.Millisecond)
118+
assert.Equal(t, int64(1), mock.updateAttempts.Load())
119+
assert.Equal(t, int64(0), errCount.Load())
120+
})
121+
122+
t.Run("error handler should be called in case of an error", func(t *testing.T) {
123+
mock := &mockWatcher{}
124+
mock.UpdateFunc = func(_ context.Context, _, _ resource.Object) error {
125+
return fmt.Errorf("IT'S-A ME, ERRORIO!")
126+
}
127+
var errCount atomic.Int64
128+
cw, err := NewConcurrentWatcher(mock, 1, func(ctx context.Context, err error) { errCount.Add(1) })
129+
assert.Nil(t, err)
130+
go cw.Run(t.Context())
131+
obj := schema.ZeroValue()
132+
err = cw.Update(t.Context(), obj, obj)
133+
assert.Nil(t, err)
134+
// this should be enough for the workers to process the event from queue.
135+
time.Sleep(500 * time.Millisecond)
136+
assert.Equal(t, int64(1), mock.updateAttempts.Load())
137+
assert.Equal(t, int64(1), errCount.Load())
138+
})
139+
140+
t.Run("successful updates with multiple workers", func(t *testing.T) {
141+
mock := &mockWatcher{}
142+
var errCount atomic.Int64
143+
cw, err := NewConcurrentWatcher(mock, 3, func(ctx context.Context, err error) { errCount.Add(1) })
144+
assert.Nil(t, err)
145+
go cw.Run(t.Context())
146+
obj1 := schema.ZeroValue()
147+
obj1.SetName("one")
148+
obj2 := schema.ZeroValue()
149+
obj2.SetName("two")
150+
obj3 := schema.ZeroValue()
151+
obj3.SetName("three")
152+
err = cw.Update(t.Context(), obj1, obj1)
153+
assert.Nil(t, err)
154+
err = cw.Update(t.Context(), obj2, obj2)
155+
assert.Nil(t, err)
156+
err = cw.Update(t.Context(), obj3, obj3)
157+
assert.Nil(t, err)
158+
// this should be enough for the workers to process the event from queue.
159+
time.Sleep(500 * time.Millisecond)
160+
assert.Equal(t, int64(3), mock.updateAttempts.Load())
161+
assert.Equal(t, int64(0), errCount.Load())
162+
})
163+
}
164+
165+
func TestConcurrentWatcher_Delete(t *testing.T) {
166+
ex := &resource.TypedSpecObject[string]{}
167+
schema := resource.NewSimpleSchema("group", "version", ex, &resource.TypedList[*resource.TypedSpecObject[string]]{})
168+
169+
t.Run("successful delete with single worker", func(t *testing.T) {
170+
mock := &mockWatcher{}
171+
var errCount atomic.Int64
172+
cw, err := NewConcurrentWatcher(mock, 1, func(ctx context.Context, err error) { errCount.Add(1) })
173+
assert.Nil(t, err)
174+
go cw.Run(t.Context())
175+
obj := schema.ZeroValue()
176+
err = cw.Delete(t.Context(), obj)
177+
assert.Nil(t, err)
178+
// this should be enough for the workers to process the event from queue.
179+
time.Sleep(500 * time.Millisecond)
180+
assert.Equal(t, int64(1), mock.deleteAttempts.Load())
181+
assert.Equal(t, int64(0), errCount.Load())
182+
})
183+
184+
t.Run("error handler should be called in case of an error", func(t *testing.T) {
185+
mock := &mockWatcher{}
186+
mock.DeleteFunc = func(_ context.Context, _ resource.Object) error {
187+
return fmt.Errorf("IT'S-A ME, ERRORIO!")
188+
}
189+
var errCount atomic.Int64
190+
cw, err := NewConcurrentWatcher(mock, 1, func(ctx context.Context, err error) { errCount.Add(1) })
191+
assert.Nil(t, err)
192+
go cw.Run(t.Context())
193+
obj := schema.ZeroValue()
194+
err = cw.Delete(t.Context(), obj)
195+
assert.Nil(t, err)
196+
// this should be enough for the workers to process the event from queue.
197+
time.Sleep(500 * time.Millisecond)
198+
assert.Equal(t, int64(1), mock.deleteAttempts.Load())
199+
assert.Equal(t, int64(1), errCount.Load())
200+
})
201+
202+
t.Run("successful deletes with multiple workers", func(t *testing.T) {
203+
mock := &mockWatcher{}
204+
var errCount atomic.Int64
205+
cw, err := NewConcurrentWatcher(mock, 3, func(ctx context.Context, err error) { errCount.Add(1) })
206+
assert.Nil(t, err)
207+
go cw.Run(t.Context())
208+
obj1 := schema.ZeroValue()
209+
obj1.SetName("one")
210+
obj2 := schema.ZeroValue()
211+
obj2.SetName("two")
212+
obj3 := schema.ZeroValue()
213+
obj3.SetName("three")
214+
err = cw.Delete(t.Context(), obj1)
215+
assert.Nil(t, err)
216+
err = cw.Delete(t.Context(), obj2)
217+
assert.Nil(t, err)
218+
err = cw.Delete(t.Context(), obj3)
219+
assert.Nil(t, err)
220+
// this should be enough for the workers to process the event from queue.
221+
time.Sleep(500 * time.Millisecond)
222+
assert.Equal(t, int64(3), mock.deleteAttempts.Load())
223+
assert.Equal(t, int64(0), errCount.Load())
224+
})
225+
}
226+
227+
func TestConcurrentWatcher(t *testing.T) {
228+
ex := &resource.TypedSpecObject[string]{}
229+
schema := resource.NewSimpleSchema("group", "version", ex, &resource.TypedList[*resource.TypedSpecObject[string]]{})
230+
231+
t.Run("successfully trigger appropriate handler methods with single worker", func(t *testing.T) {
232+
mock := &mockWatcher{}
233+
var errCount atomic.Int64
234+
cw, err := NewConcurrentWatcher(mock, 1, func(ctx context.Context, err error) { errCount.Add(1) })
235+
assert.Nil(t, err)
236+
go cw.Run(t.Context())
237+
obj := schema.ZeroValue()
238+
err = cw.Add(t.Context(), obj)
239+
assert.Nil(t, err)
240+
err = cw.Update(t.Context(), obj, obj)
241+
assert.Nil(t, err)
242+
err = cw.Update(t.Context(), obj, obj)
243+
assert.Nil(t, err)
244+
err = cw.Delete(t.Context(), obj)
245+
assert.Nil(t, err)
246+
// this should be enough for the workers to process the event from queue.
247+
time.Sleep(500 * time.Millisecond)
248+
assert.Equal(t, int64(1), mock.addAttempts.Load())
249+
assert.Equal(t, int64(2), mock.updateAttempts.Load())
250+
assert.Equal(t, int64(1), mock.deleteAttempts.Load())
251+
assert.Equal(t, int64(0), errCount.Load())
252+
})
253+
254+
t.Run("successfully trigger appropriate handler methods with multiple workers", func(t *testing.T) {
255+
mock := &mockWatcher{}
256+
var errCount atomic.Int64
257+
cw, err := NewConcurrentWatcher(mock, 3, func(ctx context.Context, err error) { errCount.Add(1) })
258+
assert.Nil(t, err)
259+
go cw.Run(t.Context())
260+
for i := 0; i < 3; i++ {
261+
obj := schema.ZeroValue()
262+
obj.SetName(strconv.Itoa(i))
263+
err = cw.Add(t.Context(), obj)
264+
assert.Nil(t, err)
265+
err = cw.Update(t.Context(), obj, obj)
266+
assert.Nil(t, err)
267+
err = cw.Update(t.Context(), obj, obj)
268+
assert.Nil(t, err)
269+
err = cw.Delete(t.Context(), obj)
270+
assert.Nil(t, err)
271+
}
272+
// this should be enough for the workers to process the event from queue.
273+
time.Sleep(500 * time.Millisecond)
274+
assert.Equal(t, int64(1*3), mock.addAttempts.Load())
275+
assert.Equal(t, int64(2*3), mock.updateAttempts.Load())
276+
assert.Equal(t, int64(1*3), mock.deleteAttempts.Load())
277+
assert.Equal(t, int64(0), errCount.Load())
278+
})
279+
280+
t.Run("event for the same object should be processed sequentially (ie by the same worker)", func(t *testing.T) {
281+
events := make([]string, 0)
282+
mock := &mockWatcher{}
283+
mock.AddFunc = func(ctx context.Context, o resource.Object) error {
284+
events = append(events, "add")
285+
return nil
286+
}
287+
mock.UpdateFunc = func(ctx context.Context, _, _ resource.Object) error {
288+
events = append(events, "update")
289+
return nil
290+
}
291+
mock.DeleteFunc = func(ctx context.Context, o resource.Object) error {
292+
events = append(events, "delete")
293+
return nil
294+
}
295+
var errCount atomic.Int64
296+
cw, err := NewConcurrentWatcher(mock, 4, func(ctx context.Context, err error) { errCount.Add(1) })
297+
assert.Nil(t, err)
298+
go cw.Run(t.Context())
299+
{
300+
obj := schema.ZeroValue()
301+
obj.SetName("one")
302+
err = cw.Add(t.Context(), obj)
303+
assert.Nil(t, err)
304+
err = cw.Update(t.Context(), obj, obj)
305+
assert.Nil(t, err)
306+
err = cw.Update(t.Context(), obj, obj)
307+
assert.Nil(t, err)
308+
err = cw.Delete(t.Context(), obj)
309+
assert.Nil(t, err)
310+
}
311+
// this should be enough for the workers to process the event from queue.
312+
time.Sleep(500 * time.Millisecond)
313+
assert.Equal(t, int64(1), mock.addAttempts.Load())
314+
assert.Equal(t, int64(2), mock.updateAttempts.Load())
315+
assert.Equal(t, int64(1), mock.deleteAttempts.Load())
316+
assert.Equal(t, int64(0), errCount.Load())
317+
// Events recieved should be in the same order of events triggered always.
318+
assert.Equal(t, []string{"add", "update", "update", "delete"}, events)
319+
})
320+
}
321+
322+
type mockWatcher struct {
323+
addAttempts atomic.Int64
324+
updateAttempts atomic.Int64
325+
deleteAttempts atomic.Int64
326+
327+
SimpleWatcher
328+
}
329+
330+
func (mw *mockWatcher) Add(ctx context.Context, obj resource.Object) error {
331+
mw.addAttempts.Add(1)
332+
return mw.SimpleWatcher.Add(ctx, obj)
333+
}
334+
335+
func (mw *mockWatcher) Update(ctx context.Context, src, tgt resource.Object) error {
336+
mw.updateAttempts.Add(1)
337+
return mw.SimpleWatcher.Update(ctx, src, tgt)
338+
}
339+
340+
func (mw *mockWatcher) Delete(ctx context.Context, obj resource.Object) error {
341+
mw.deleteAttempts.Add(1)
342+
return mw.SimpleWatcher.Delete(ctx, obj)
343+
}

0 commit comments

Comments
 (0)