Skip to content

Commit d2ba1f9

Browse files
committed
update to add decoder for cilium plugin
1 parent 24132d6 commit d2ba1f9

File tree

6 files changed

+366
-135
lines changed

6 files changed

+366
-135
lines changed

cmd/hubble/daemon_linux.go

+1-15
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@ import (
1919

2020
"github.com/cilium/cilium/pkg/hive/cell"
2121
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
22-
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
2322
"github.com/cilium/cilium/pkg/ipcache"
2423
"github.com/cilium/cilium/pkg/k8s"
2524
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
2625
"github.com/cilium/cilium/pkg/k8s/watchers"
27-
"github.com/cilium/cilium/pkg/monitor"
2826
monitoragent "github.com/cilium/cilium/pkg/monitor/agent"
29-
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
3027
"github.com/cilium/cilium/pkg/node"
3128
"github.com/cilium/workerpool"
3229

@@ -148,18 +145,7 @@ func (d *Daemon) generateEvents(ctx context.Context) {
148145
return
149146
case event := <-d.eventChan:
150147
d.log.WithField("event", event).Debug("Sending event to monitor agent")
151-
var err error
152-
switch event.Event.(type) {
153-
case monitor.LogRecordNotify:
154-
err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAccessLog, event.Event)
155-
case monitorAPI.AgentNotifyMessage:
156-
err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAgent, event.Event)
157-
case observerTypes.PerfEvent:
158-
// send 1 here to signal bpf event
159-
err = d.monitorAgent.SendEvent(1, event.Event)
160-
default:
161-
err = d.monitorAgent.SendEvent(0, event.Event)
162-
}
148+
err := d.monitorAgent.SendEvent(0, event)
163149
if err != nil {
164150
d.log.WithError(err).Error("Unable to send event to monitor agent")
165151
}

pkg/hubble/common/decoder.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
package common
1+
package common

pkg/monitoragent/monitoragent_linux.go

+1-28
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"fmt"
99

1010
"github.com/cilium/cilium/api/v1/models"
11-
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
1211
"github.com/cilium/cilium/pkg/lock"
1312
"github.com/cilium/cilium/pkg/monitor/agent/consumer"
1413
"github.com/cilium/cilium/pkg/monitor/agent/listener"
@@ -65,19 +64,7 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error {
6564
// While we want to avoid marshalling events if there are no active
6665
// listeners, there's no need to check for active consumers ahead of time.
6766

68-
log.Info("SendEvent called")
69-
// perfEvents are bpf datapath events (reserved for 0-128), where 0 = unspecified
70-
if typ > 0 && typ <= 128 {
71-
log.Info("SendEvent called with PERF event")
72-
event, ok := event.(observerTypes.PerfEvent)
73-
if !ok {
74-
return fmt.Errorf("unexpected event type for perf event")
75-
}
76-
a.sendPerfEvent(event)
77-
} else {
78-
log.Info("SendEvent called with AGENT event")
79-
a.notifyAgentEvent(typ, event)
80-
}
67+
a.notifyAgentEvent(typ, event)
8168

8269
// do not marshal notifications if there are no active listeners
8370
if !a.hasListeners() {
@@ -111,20 +98,6 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error {
11198
return nil
11299
}
113100

114-
func (a *monitorAgent) sendPerfEvent(event observerTypes.PerfEvent) {
115-
a.Lock()
116-
defer a.Unlock()
117-
a.notifyPerfEventLocked(event.Data, event.CPU)
118-
}
119-
120-
// notifyPerfEventLocked notifies all consumers about a perf event.
121-
// The caller must hold the monitor lock.
122-
func (a *monitorAgent) notifyPerfEventLocked(data []byte, cpu int) {
123-
for mc := range a.consumers {
124-
mc.NotifyPerfEvent(data, cpu)
125-
}
126-
}
127-
128101
func (a *monitorAgent) RegisterNewListener(newListener listener.MonitorListener) {
129102
if a == nil || newListener == nil {
130103
return

pkg/plugin/cilium/cilium_linux.go

+13-91
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package cilium
66

77
import (
8-
"bytes"
98
"context"
109
"encoding/gob"
1110
"errors"
@@ -14,16 +13,12 @@ import (
1413
"time"
1514

1615
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
17-
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
18-
"github.com/cilium/cilium/pkg/monitor"
19-
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
2016
"github.com/cilium/cilium/pkg/monitor/payload"
2117
kcfg "github.com/microsoft/retina/pkg/config"
2218
"github.com/microsoft/retina/pkg/enricher"
2319
"github.com/microsoft/retina/pkg/log"
2420
"github.com/microsoft/retina/pkg/plugin/api"
2521
"go.uber.org/zap"
26-
"google.golang.org/protobuf/types/known/timestamppb"
2722
)
2823

2924
const (
@@ -51,6 +46,10 @@ func (c *cilium) Compile(ctx context.Context) error {
5146
}
5247

5348
func (c *cilium) Init() error {
49+
c.p = &parser{
50+
l: c.l,
51+
}
52+
c.p.Init()
5453
c.l.Info("Initialized cilium plugin")
5554
return nil
5655
}
@@ -118,94 +117,17 @@ func (c *cilium) monitorLoop(ctx context.Context) error {
118117
c.l.Error("Failed to decode payload", zap.Error(err))
119118
return err
120119
}
121-
// monitorAgent.SendEvents - is used to notify for Agent Events (Access Log (proxy) and Agent Notify (cilium agent events - crud for ep, policy, svc))
122-
123-
// hubble monitorConsumer.sendEvent -- (NotifyPerfEvent) this func sends a monitorEvent to the consumer from hubble monitor.
124-
// specifically, the hubble consumer adds the event to the observer's event channel
125-
126-
// Agent Events
127-
// - MessageTypeAccessLog: accesslog.LogRecord
128-
// - MessageTypeAgent: api.AgentNotify
129-
// Perf Events
130-
// - MessageTypeDrop: monitor.DropNotify
131-
// - MessageTypeDebug: monitor.DebugMsg
132-
// - MessageTypeCapture: monitor.DebugCapture
133-
// - MessageTypeTrace: monitor.TraceNotify
134-
// - MessageTypePolicyVerdict: monitor.PolicyVerdictNotify
135-
136-
c.l.Debug("Received cilium event", zap.Int("type", pl.Type), zap.Any("payload", pl.Data))
137-
switch pl.Type {
138-
case payload.EventSample:
139-
data := pl.Data
140-
messageType := data[0]
141-
switch messageType {
142-
// Agent Events
143-
case monitorAPI.MessageTypeAccessLog:
144-
buf := bytes.NewBuffer(data[1:])
145-
dec := gob.NewDecoder(buf)
146-
lr := monitor.LogRecordNotify{}
147-
if err := dec.Decode(&lr); err != nil {
148-
c.l.Error("Failed to decode log record notify", zap.Error(err))
149-
continue
150-
}
151-
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: lr}
152-
case monitorAPI.MessageTypeAgent:
153-
buf := bytes.NewBuffer(data[1:])
154-
dec := gob.NewDecoder(buf)
155-
an := monitorAPI.AgentNotifyMessage{}
156-
if err := dec.Decode(&an); err != nil {
157-
c.l.Error("Failed to decode agent notify", zap.Error(err))
158-
continue
159-
}
160-
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: an}
161-
// Perf events
162-
// case monitorAPI.MessageTypeDrop:
163-
// dn := monitor.DropNotify{}
164-
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dn); err != nil {
165-
// c.l.Error("Failed to decode drop notify", zap.Error(err))
166-
// continue
167-
// }
168-
// c.l.Info("Drop event", zap.Any("data", dn))
169-
// case monitorAPI.MessageTypeTrace:
170-
// tn := monitor.TraceNotify{}
171-
// if err := monitor.DecodeTraceNotify(data, &tn); err != nil {
172-
// c.l.Error("Failed to decode trace notify", zap.Error(err))
173-
// continue
174-
// }
175-
// c.l.Info("Trace event", zap.Any("data", tn))
176-
// case monitorAPI.MessageTypePolicyVerdict:
177-
// pn := monitor.PolicyVerdictNotify{}
178-
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &pn); err != nil {
179-
// c.l.Error("Failed to decode policy verdict notify", zap.Error(err))
180-
// continue
181-
// }
182-
// c.l.Info("Policy verdict event", zap.Any("data", pn))
183-
// case monitorAPI.MessageTypeDebug:
184-
// c.l.Info("Debug event", zap.Any("data", data))
185-
// case monitorAPI.MessageTypeCapture:
186-
// c.l.Info("Capture event", zap.Any("data", data))
187-
// case monitorAPI.MessageTypeRecCapture:
188-
// c.l.Info("Recorder capture event", zap.Any("data", data))
189-
// case monitorAPI.MessageTypeTraceSock:
190-
// c.l.Info("Trace sock event", zap.Any("data", data))
191-
// default:
192-
// c.l.Info("Unknown event", zap.Any("data", data))
193-
default:
194-
ev := observerTypes.PerfEvent{
195-
CPU: pl.CPU,
196-
Data: pl.Data,
197-
}
198-
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: ev}
120+
fl, err := c.p.Decode(&pl)
121+
if err == nil {
122+
c.l.Debug("Decoded flow", zap.Any("flow", fl))
123+
event := &v1.Event{
124+
Event: fl,
125+
Timestamp: fl.GetTime(),
199126
}
200-
case payload.RecordLost:
201-
c.l.Warn("Record lost for cilium event", zap.Uint64("lost", pl.Lost))
202-
default:
203-
c.l.Warn("Unknown event type", zap.Int("type", pl.Type))
204-
continue
127+
c.externalChannel <- event
128+
} else {
129+
c.l.Warn("Failed to decode to flow", zap.Error(err))
205130
}
206-
// if newMA, ok := c.monitorAgent.(AgentV2); ok {
207-
// newMA.SendPerfEvent(record)
208-
// }
209131
}
210132
}
211133
}

0 commit comments

Comments
 (0)