-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathssemanager.go
188 lines (172 loc) · 5.17 KB
/
ssemanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package devcycle
import (
"context"
"encoding/json"
"fmt"
"github.com/devcyclehq/go-server-sdk/v2/api"
"github.com/devcyclehq/go-server-sdk/v2/util"
"github.com/launchdarkly/eventsource"
"sync/atomic"
"time"
)
type SSEManager struct {
configManager *EnvironmentConfigManager
options *Options
stream *eventsource.Stream
eventChannel chan eventsource.Event
url string
errorHandler eventsource.StreamErrorHandler
context context.Context
stopEventHandler context.CancelFunc
cfg *HTTPConfiguration
Started bool
Connected atomic.Bool
}
type sseEvent struct {
Id string `json:"id"`
Timestamp float64 `json:"timestamp"`
Channel string `json:"channel"`
Data string `json:"data"`
Name string `json:"name"`
}
type sseMessage struct {
Etag string `json:"etag,omitempty"`
LastModified float64 `json:"lastModified,omitempty"`
Type_ string `json:"type,omitempty"`
}
func (m *sseMessage) LastModifiedDuration() time.Duration {
return time.Duration(m.LastModified) * time.Millisecond
}
func newSSEManager(configManager *EnvironmentConfigManager, options *Options, cfg *HTTPConfiguration) (*SSEManager, error) {
if options == nil {
return nil, fmt.Errorf("SSE - Options cannot be nil")
}
sseManager := &SSEManager{
configManager: configManager,
options: options,
errorHandler: func(err error) eventsource.StreamErrorHandlerResult {
util.Debugf("SSE - Error: %v\n", err)
return eventsource.StreamErrorHandlerResult{
CloseNow: false,
}
},
cfg: cfg,
}
sseManager.Connected.Store(false)
sseManager.context, sseManager.stopEventHandler = context.WithCancel(context.Background())
return sseManager, nil
}
func (m *SSEManager) connectSSE(url string) (err error) {
// A stream is mutex locked - so we need to make sure we close it before we open a new one
// This is to prevent multiple streams from being opened, and to prevent race conditions on accessing/reading from
// the event stream
if m.stream != nil {
m.stream.Close()
}
sseClientEvent := api.ClientEvent{
EventType: api.ClientEventType_InternalSSEConnected,
EventData: "Connected to SSE stream: " + url,
Status: "success",
Error: nil,
}
defer func() {
m.configManager.InternalClientEvents <- sseClientEvent
}()
sse, err := eventsource.SubscribeWithURL(url,
eventsource.StreamOptionReadTimeout(m.options.RequestTimeout),
eventsource.StreamOptionCanRetryFirstConnection(m.options.RequestTimeout),
eventsource.StreamOptionErrorHandler(m.errorHandler),
eventsource.StreamOptionUseBackoff(m.options.RequestTimeout),
eventsource.StreamOptionUseJitter(0.25),
eventsource.StreamOptionHTTPClient(m.cfg.HTTPClient))
if err != nil {
sseClientEvent.EventType = api.ClientEventType_InternalSSEFailure
sseClientEvent.Status = "failure"
sseClientEvent.Error = err
sseClientEvent.EventData = "Error connecting to SSE stream: " + url
return
}
m.Connected.Store(true)
m.stream = sse
m.eventChannel = m.stream.Events
m.Started = sseClientEvent.Error == nil
go m.receiveSSEMessages()
return
}
func (m *SSEManager) parseMessage(rawMessage []byte) (message sseMessage, err error) {
event := sseEvent{}
err = json.Unmarshal(rawMessage, &event)
if err != nil {
return
}
err = json.Unmarshal([]byte(event.Data), &message)
return
}
func (m *SSEManager) receiveSSEMessages() {
for {
// If the stream is killed/stopped - we should stop polling
if m.stream == nil || m.context.Err() != nil {
m.Connected.Store(false)
m.configManager.InternalClientEvents <- api.ClientEvent{
EventType: api.ClientEventType_InternalSSEFailure,
EventData: "SSE stream has been stopped",
Status: "failure",
Error: m.context.Err(),
}
return
}
err := func() error {
select {
case <-m.context.Done():
m.Connected.Store(false)
return fmt.Errorf("SSE - Stopping SSE polling")
case event, ok := <-m.eventChannel:
if !ok {
return nil
}
if m.options.ClientEventHandler != nil {
go func() {
m.options.ClientEventHandler <- api.ClientEvent{
EventType: api.ClientEventType_RealtimeUpdates,
EventData: event,
Status: "info",
Error: nil,
}
}()
}
message, err := m.parseMessage([]byte(event.Data()))
if err != nil {
util.Debugf("SSE - Error unmarshalling message: %v\n", err)
return nil
}
if message.Type_ == "refetchConfig" || message.Type_ == "" {
util.Debugf("SSE - Received refetchConfig message: %v\n", message)
m.configManager.InternalClientEvents <- api.ClientEvent{
EventType: api.ClientEventType_InternalNewConfigAvailable,
EventData: time.UnixMilli(int64(message.LastModified)),
Status: "",
Error: nil,
}
}
}
return nil
}()
if err != nil {
return
}
}
}
func (m *SSEManager) StartSSEOverride(url string) error {
m.url = url
return m.connectSSE(url)
}
func (m *SSEManager) StopSSE() {
if m.stream != nil {
m.stream.Close()
// Close wraps `close` and is safe to call in threads - this also just explicitly sets the stream to nil
m.stream = nil
}
}
func (m *SSEManager) Close() {
m.stopEventHandler()
}