Skip to content

Commit

Permalink
update to add decoder for cilium plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
snguyen64 committed Aug 5, 2024
1 parent 24132d6 commit 5bc2c73
Show file tree
Hide file tree
Showing 6 changed files with 360 additions and 135 deletions.
16 changes: 1 addition & 15 deletions cmd/hubble/daemon_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ import (

"github.com/cilium/cilium/pkg/hive/cell"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/watchers"
"github.com/cilium/cilium/pkg/monitor"
monitoragent "github.com/cilium/cilium/pkg/monitor/agent"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/workerpool"

Expand Down Expand Up @@ -148,18 +145,7 @@ func (d *Daemon) generateEvents(ctx context.Context) {
return
case event := <-d.eventChan:
d.log.WithField("event", event).Debug("Sending event to monitor agent")
var err error
switch event.Event.(type) {
case monitor.LogRecordNotify:
err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAccessLog, event.Event)
case monitorAPI.AgentNotifyMessage:
err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAgent, event.Event)
case observerTypes.PerfEvent:
// send 1 here to signal bpf event
err = d.monitorAgent.SendEvent(1, event.Event)
default:
err = d.monitorAgent.SendEvent(0, event.Event)
}
err := d.monitorAgent.SendEvent(0, event)
if err != nil {
d.log.WithError(err).Error("Unable to send event to monitor agent")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/hubble/common/decoder.go
Original file line number Diff line number Diff line change
@@ -1 +1 @@
package common
package common
29 changes: 1 addition & 28 deletions pkg/monitoragent/monitoragent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"

"github.com/cilium/cilium/api/v1/models"
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/monitor/agent/consumer"
"github.com/cilium/cilium/pkg/monitor/agent/listener"
Expand Down Expand Up @@ -65,19 +64,7 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error {
// While we want to avoid marshalling events if there are no active
// listeners, there's no need to check for active consumers ahead of time.

log.Info("SendEvent called")
// perfEvents are bpf datapath events (reserved for 0-128), where 0 = unspecified
if typ > 0 && typ <= 128 {
log.Info("SendEvent called with PERF event")
event, ok := event.(observerTypes.PerfEvent)
if !ok {
return fmt.Errorf("unexpected event type for perf event")
}
a.sendPerfEvent(event)
} else {
log.Info("SendEvent called with AGENT event")
a.notifyAgentEvent(typ, event)
}
a.notifyAgentEvent(typ, event)

// do not marshal notifications if there are no active listeners
if !a.hasListeners() {
Expand Down Expand Up @@ -111,20 +98,6 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error {
return nil
}

func (a *monitorAgent) sendPerfEvent(event observerTypes.PerfEvent) {
a.Lock()
defer a.Unlock()
a.notifyPerfEventLocked(event.Data, event.CPU)
}

// notifyPerfEventLocked notifies all consumers about a perf event.
// The caller must hold the monitor lock.
func (a *monitorAgent) notifyPerfEventLocked(data []byte, cpu int) {
for mc := range a.consumers {
mc.NotifyPerfEvent(data, cpu)
}
}

func (a *monitorAgent) RegisterNewListener(newListener listener.MonitorListener) {
if a == nil || newListener == nil {
return
Expand Down
104 changes: 13 additions & 91 deletions pkg/plugin/cilium/cilium_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package cilium

import (
"bytes"
"context"
"encoding/gob"
"errors"
Expand All @@ -14,16 +13,12 @@ import (
"time"

v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/monitor"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/monitor/payload"
kcfg "github.com/microsoft/retina/pkg/config"
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/plugin/api"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
Expand Down Expand Up @@ -51,6 +46,10 @@ func (c *cilium) Compile(ctx context.Context) error {
}

func (c *cilium) Init() error {
c.p = &parser{
l: c.l,
}
c.p.Init()
c.l.Info("Initialized cilium plugin")
return nil
}
Expand Down Expand Up @@ -118,94 +117,17 @@ func (c *cilium) monitorLoop(ctx context.Context) error {
c.l.Error("Failed to decode payload", zap.Error(err))
return err
}
// monitorAgent.SendEvents - is used to notify for Agent Events (Access Log (proxy) and Agent Notify (cilium agent events - crud for ep, policy, svc))

// hubble monitorConsumer.sendEvent -- (NotifyPerfEvent) this func sends a monitorEvent to the consumer from hubble monitor.
// specifically, the hubble consumer adds the event to the observer's event channel

// Agent Events
// - MessageTypeAccessLog: accesslog.LogRecord
// - MessageTypeAgent: api.AgentNotify
// Perf Events
// - MessageTypeDrop: monitor.DropNotify
// - MessageTypeDebug: monitor.DebugMsg
// - MessageTypeCapture: monitor.DebugCapture
// - MessageTypeTrace: monitor.TraceNotify
// - MessageTypePolicyVerdict: monitor.PolicyVerdictNotify

c.l.Debug("Received cilium event", zap.Int("type", pl.Type), zap.Any("payload", pl.Data))
switch pl.Type {
case payload.EventSample:
data := pl.Data
messageType := data[0]
switch messageType {
// Agent Events
case monitorAPI.MessageTypeAccessLog:
buf := bytes.NewBuffer(data[1:])
dec := gob.NewDecoder(buf)
lr := monitor.LogRecordNotify{}
if err := dec.Decode(&lr); err != nil {
c.l.Error("Failed to decode log record notify", zap.Error(err))
continue
}
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: lr}
case monitorAPI.MessageTypeAgent:
buf := bytes.NewBuffer(data[1:])
dec := gob.NewDecoder(buf)
an := monitorAPI.AgentNotifyMessage{}
if err := dec.Decode(&an); err != nil {
c.l.Error("Failed to decode agent notify", zap.Error(err))
continue
}
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: an}
// Perf events
// case monitorAPI.MessageTypeDrop:
// dn := monitor.DropNotify{}
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dn); err != nil {
// c.l.Error("Failed to decode drop notify", zap.Error(err))
// continue
// }
// c.l.Info("Drop event", zap.Any("data", dn))
// case monitorAPI.MessageTypeTrace:
// tn := monitor.TraceNotify{}
// if err := monitor.DecodeTraceNotify(data, &tn); err != nil {
// c.l.Error("Failed to decode trace notify", zap.Error(err))
// continue
// }
// c.l.Info("Trace event", zap.Any("data", tn))
// case monitorAPI.MessageTypePolicyVerdict:
// pn := monitor.PolicyVerdictNotify{}
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &pn); err != nil {
// c.l.Error("Failed to decode policy verdict notify", zap.Error(err))
// continue
// }
// c.l.Info("Policy verdict event", zap.Any("data", pn))
// case monitorAPI.MessageTypeDebug:
// c.l.Info("Debug event", zap.Any("data", data))
// case monitorAPI.MessageTypeCapture:
// c.l.Info("Capture event", zap.Any("data", data))
// case monitorAPI.MessageTypeRecCapture:
// c.l.Info("Recorder capture event", zap.Any("data", data))
// case monitorAPI.MessageTypeTraceSock:
// c.l.Info("Trace sock event", zap.Any("data", data))
// default:
// c.l.Info("Unknown event", zap.Any("data", data))
default:
ev := observerTypes.PerfEvent{
CPU: pl.CPU,
Data: pl.Data,
}
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: ev}
fl, err := c.p.Decode(&pl)
if err == nil {
c.l.Debug("Decoded flow", zap.Any("flow", fl))
event := &v1.Event{
Event: fl,
Timestamp: fl.GetTime(),
}
case payload.RecordLost:
c.l.Warn("Record lost for cilium event", zap.Uint64("lost", pl.Lost))
default:
c.l.Warn("Unknown event type", zap.Int("type", pl.Type))
continue
c.externalChannel <- event
} else {
c.l.Warn("Failed to decode to flow", zap.Error(err))
}
// if newMA, ok := c.monitorAgent.(AgentV2); ok {
// newMA.SendPerfEvent(record)
// }
}
}
}
Loading

0 comments on commit 5bc2c73

Please sign in to comment.