|
5 | 5 | package cilium
|
6 | 6 |
|
7 | 7 | import (
|
8 |
| - "bytes" |
9 | 8 | "context"
|
10 | 9 | "encoding/gob"
|
11 | 10 | "errors"
|
12 | 11 | "io"
|
13 | 12 | "net"
|
14 | 13 | "time"
|
15 | 14 |
|
| 15 | + "github.com/cilium/cilium/api/v1/flow" |
16 | 16 | v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
|
17 |
| - observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types" |
18 |
| - "github.com/cilium/cilium/pkg/monitor" |
19 |
| - monitorAPI "github.com/cilium/cilium/pkg/monitor/api" |
20 | 17 | "github.com/cilium/cilium/pkg/monitor/payload"
|
21 | 18 | kcfg "github.com/microsoft/retina/pkg/config"
|
22 | 19 | "github.com/microsoft/retina/pkg/enricher"
|
23 | 20 | "github.com/microsoft/retina/pkg/log"
|
24 | 21 | "github.com/microsoft/retina/pkg/plugin/api"
|
25 | 22 | "go.uber.org/zap"
|
26 |
| - "google.golang.org/protobuf/types/known/timestamppb" |
27 | 23 | )
|
28 | 24 |
|
29 | 25 | const (
|
@@ -51,6 +47,10 @@ func (c *cilium) Compile(ctx context.Context) error {
|
51 | 47 | }
|
52 | 48 |
|
53 | 49 | func (c *cilium) Init() error {
|
| 50 | + c.p = &parser{ |
| 51 | + l: c.l, |
| 52 | + } |
| 53 | + c.p.Init() |
54 | 54 | c.l.Info("Initialized cilium plugin")
|
55 | 55 | return nil
|
56 | 56 | }
|
@@ -118,94 +118,13 @@ func (c *cilium) monitorLoop(ctx context.Context) error {
|
118 | 118 | c.l.Error("Failed to decode payload", zap.Error(err))
|
119 | 119 | return err
|
120 | 120 | }
|
121 |
| - // monitorAgent.SendEvents - is used to notify for Agent Events (Access Log (proxy) and Agent Notify (cilium agent events - crud for ep, policy, svc)) |
122 |
| - |
123 |
| - // hubble monitorConsumer.sendEvent -- (NotifyPerfEvent) this func sends a monitorEvent to the consumer from hubble monitor. |
124 |
| - // specifically, the hubble consumer adds the event to the observer's event channel |
125 |
| - |
126 |
| - // Agent Events |
127 |
| - // - MessageTypeAccessLog: accesslog.LogRecord |
128 |
| - // - MessageTypeAgent: api.AgentNotify |
129 |
| - // Perf Events |
130 |
| - // - MessageTypeDrop: monitor.DropNotify |
131 |
| - // - MessageTypeDebug: monitor.DebugMsg |
132 |
| - // - MessageTypeCapture: monitor.DebugCapture |
133 |
| - // - MessageTypeTrace: monitor.TraceNotify |
134 |
| - // - MessageTypePolicyVerdict: monitor.PolicyVerdictNotify |
135 |
| - |
136 |
| - c.l.Debug("Received cilium event", zap.Int("type", pl.Type), zap.Any("payload", pl.Data)) |
137 |
| - switch pl.Type { |
138 |
| - case payload.EventSample: |
139 |
| - data := pl.Data |
140 |
| - messageType := data[0] |
141 |
| - switch messageType { |
142 |
| - // Agent Events |
143 |
| - case monitorAPI.MessageTypeAccessLog: |
144 |
| - buf := bytes.NewBuffer(data[1:]) |
145 |
| - dec := gob.NewDecoder(buf) |
146 |
| - lr := monitor.LogRecordNotify{} |
147 |
| - if err := dec.Decode(&lr); err != nil { |
148 |
| - c.l.Error("Failed to decode log record notify", zap.Error(err)) |
149 |
| - continue |
150 |
| - } |
151 |
| - c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: lr} |
152 |
| - case monitorAPI.MessageTypeAgent: |
153 |
| - buf := bytes.NewBuffer(data[1:]) |
154 |
| - dec := gob.NewDecoder(buf) |
155 |
| - an := monitorAPI.AgentNotifyMessage{} |
156 |
| - if err := dec.Decode(&an); err != nil { |
157 |
| - c.l.Error("Failed to decode agent notify", zap.Error(err)) |
158 |
| - continue |
159 |
| - } |
160 |
| - c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: an} |
161 |
| - // Perf events |
162 |
| - // case monitorAPI.MessageTypeDrop: |
163 |
| - // dn := monitor.DropNotify{} |
164 |
| - // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dn); err != nil { |
165 |
| - // c.l.Error("Failed to decode drop notify", zap.Error(err)) |
166 |
| - // continue |
167 |
| - // } |
168 |
| - // c.l.Info("Drop event", zap.Any("data", dn)) |
169 |
| - // case monitorAPI.MessageTypeTrace: |
170 |
| - // tn := monitor.TraceNotify{} |
171 |
| - // if err := monitor.DecodeTraceNotify(data, &tn); err != nil { |
172 |
| - // c.l.Error("Failed to decode trace notify", zap.Error(err)) |
173 |
| - // continue |
174 |
| - // } |
175 |
| - // c.l.Info("Trace event", zap.Any("data", tn)) |
176 |
| - // case monitorAPI.MessageTypePolicyVerdict: |
177 |
| - // pn := monitor.PolicyVerdictNotify{} |
178 |
| - // if err := binary.Read(bytes.NewReader(data), byteorder.Native, &pn); err != nil { |
179 |
| - // c.l.Error("Failed to decode policy verdict notify", zap.Error(err)) |
180 |
| - // continue |
181 |
| - // } |
182 |
| - // c.l.Info("Policy verdict event", zap.Any("data", pn)) |
183 |
| - // case monitorAPI.MessageTypeDebug: |
184 |
| - // c.l.Info("Debug event", zap.Any("data", data)) |
185 |
| - // case monitorAPI.MessageTypeCapture: |
186 |
| - // c.l.Info("Capture event", zap.Any("data", data)) |
187 |
| - // case monitorAPI.MessageTypeRecCapture: |
188 |
| - // c.l.Info("Recorder capture event", zap.Any("data", data)) |
189 |
| - // case monitorAPI.MessageTypeTraceSock: |
190 |
| - // c.l.Info("Trace sock event", zap.Any("data", data)) |
191 |
| - // default: |
192 |
| - // c.l.Info("Unknown event", zap.Any("data", data)) |
193 |
| - default: |
194 |
| - ev := observerTypes.PerfEvent{ |
195 |
| - CPU: pl.CPU, |
196 |
| - Data: pl.Data, |
197 |
| - } |
198 |
| - c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: ev} |
199 |
| - } |
200 |
| - case payload.RecordLost: |
201 |
| - c.l.Warn("Record lost for cilium event", zap.Uint64("lost", pl.Lost)) |
202 |
| - default: |
203 |
| - c.l.Warn("Unknown event type", zap.Int("type", pl.Type)) |
204 |
| - continue |
| 121 | + var fl *flow.Flow |
| 122 | + err := c.p.Decode(&pl, fl) |
| 123 | + if err == nil { |
| 124 | + c.externalChannel <- &v1.Event{Timestamp: fl.GetTime(), Event: fl} |
| 125 | + } else { |
| 126 | + c.l.Warn("Failed to decode to flow", zap.Error(err)) |
205 | 127 | }
|
206 |
| - // if newMA, ok := c.monitorAgent.(AgentV2); ok { |
207 |
| - // newMA.SendPerfEvent(record) |
208 |
| - // } |
209 | 128 | }
|
210 | 129 | }
|
211 | 130 | }
|
0 commit comments