-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrolling_window.go
117 lines (97 loc) · 2.45 KB
/
rolling_window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package limit
import (
"context"
"sync"
"time"
)
type eventLog struct {
timestamp time.Time
}
type rollingWindow struct {
// Mutex
mux sync.Mutex
// Config
maxEventCount int
rateDuration time.Duration
// State
allowedEvents int
deniedEvents int
rollingWindow []eventLog
}
// NewRollingWindow creates a new rolling window rate limiter.
// The count parameter is the number of events allowed in the duration.
// The duration parameter is the time window in which the events are allowed.
func NewRollingWindow(count int, duration time.Duration) Limiter {
return &rollingWindow{
mux: sync.Mutex{},
maxEventCount: count,
rateDuration: duration,
rollingWindow: make([]eventLog, 0),
}
}
func (r *rollingWindow) WaitContext(ctx context.Context) error {
for {
r.mux.Lock()
r.removeExpiredEvents()
if len(r.rollingWindow) < r.maxEventCount {
r.rollingWindow = append(r.rollingWindow, eventLog{timestamp: time.Now()})
r.allowedEvents++
r.mux.Unlock()
return nil
}
r.mux.Unlock()
select {
case <-ctx.Done():
r.mux.Lock()
r.deniedEvents++
r.mux.Unlock()
return ctx.Err()
case <-time.After(r.rollingWindow[0].timestamp.Add(r.rateDuration).Sub(time.Now())):
// Wait until the next event is allowed
}
}
}
func (r *rollingWindow) Wait() {
_ = r.WaitContext(context.Background())
}
func (r *rollingWindow) WaitTimeout(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return r.WaitContext(ctx)
}
func (r *rollingWindow) Allow() bool {
r.mux.Lock()
defer r.mux.Unlock()
r.removeExpiredEvents()
// Check if the event can be allowed
if len(r.rollingWindow) < r.maxEventCount {
r.rollingWindow = append(r.rollingWindow, eventLog{timestamp: time.Now()})
r.allowedEvents++
return true
}
r.deniedEvents++
return false
}
func (r *rollingWindow) removeExpiredEvents() {
for len(r.rollingWindow) > 0 && time.Since(r.rollingWindow[0].timestamp) > r.rateDuration {
r.rollingWindow = r.rollingWindow[1:]
}
}
func (r *rollingWindow) Clear() {
r.mux.Lock()
defer r.mux.Unlock()
r.rollingWindow = make([]eventLog, 0)
}
func (r *rollingWindow) Stats() Stats {
r.mux.Lock()
defer r.mux.Unlock()
nextAllowedTime := time.Now()
if len(r.rollingWindow) > 0 {
nextAllowedTime = r.rollingWindow[0].timestamp.Add(r.rateDuration)
}
return Stats{
AllowedRequests: r.allowedEvents,
DeniedRequests: r.deniedEvents,
NextAllowedTime: nextAllowedTime,
}
}