Skip to content

Commit

Permalink
feat(cil): add cilium plugin to read events off unix socket
Browse files Browse the repository at this point in the history
  • Loading branch information
snguyen64 committed Aug 1, 2024
1 parent cb7697a commit 24132d6
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 11 deletions.
8 changes: 5 additions & 3 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
{
"name": "retina",
"image": "mcr.microsoft.com/devcontainers/base:jammy",
// run args are used for cilium
"runArgs": ["--cap-add=NET_ADMIN", "--cap-add=SYS_MODULE"],
"features": {
"ghcr.io/devcontainers/features/common-utils:2": {},
"ghcr.io/devcontainers/features/docker-in-docker:2": {},
"ghcr.io/devcontainers/features/github-cli:1": {},
"ghcr.io/devcontainers/features/go:1": {},
"ghcr.io/devcontainers/features/kubectl-helm-minikube:1": {},
"ghcr.io/devcontainers-contrib/features/kind:1": {}
"ghcr.io/devcontainers/features/kubectl-helm-minikube:1": {}
// "ghcr.io/devcontainers-contrib/features/kind:1": {}
},
"postCreateCommand": "bash .devcontainer/installMoreTools.sh && kind create cluster",
"postCreateCommand": "bash .devcontainer/installMoreTools.sh && minikube start --driver=docker",
"customizations": {
"vscode": {
"extensions": [
Expand Down
6 changes: 6 additions & 0 deletions .devcontainer/installCilium.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
curl -L --remote-name https://github.com/cilium/cilium-cli/releases/latest/download/cilium-linux-amd64.tar.gz
tar xzvf cilium-linux-amd64.tar.gz
sudo mv cilium /usr/local/bin/
rm cilium-linux-amd64.tar.gz
cilium install
1 change: 1 addition & 0 deletions .devcontainer/installMoreTools.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# Install the required tools and dependencies
sudo apt-get update && sudo apt-get install -y lsb-release wget software-properties-common gnupg clang-14 lldb-14 lld-14 clangd-14 man-db
export PATH=$PATH:/usr/lib/llvm-14/bin

# Install LLVM 14
export LLVM_VERSION=14
Expand Down
16 changes: 15 additions & 1 deletion cmd/hubble/daemon_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (

"github.com/cilium/cilium/pkg/hive/cell"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/watchers"
"github.com/cilium/cilium/pkg/monitor"
monitoragent "github.com/cilium/cilium/pkg/monitor/agent"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/workerpool"

Expand Down Expand Up @@ -145,7 +148,18 @@ func (d *Daemon) generateEvents(ctx context.Context) {
return
case event := <-d.eventChan:
d.log.WithField("event", event).Debug("Sending event to monitor agent")
err := d.monitorAgent.SendEvent(0, event)
var err error
switch event.Event.(type) {
case monitor.LogRecordNotify:
err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAccessLog, event.Event)
case monitorAPI.AgentNotifyMessage:
err = d.monitorAgent.SendEvent(monitorAPI.MessageTypeAgent, event.Event)
case observerTypes.PerfEvent:
// send 1 here to signal bpf event
err = d.monitorAgent.SendEvent(1, event.Event)
default:
err = d.monitorAgent.SendEvent(0, event.Event)
}
if err != nil {
d.log.WithError(err).Error("Unable to send event to monitor agent")
}
Expand Down
1 change: 1 addition & 0 deletions deploy/legacy/manifests/controller/helm/retina/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ volumeMounts:
cgroup: /sys/fs/cgroup
tmp: /tmp
config: /retina/config
varrun: /var/run

#volume mounts for windows
volumeMounts_win:
Expand Down
1 change: 1 addition & 0 deletions pkg/hubble/common/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package common
11 changes: 5 additions & 6 deletions pkg/monitoragent/cell_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package monitoragent

import (
"context"
"fmt"

"github.com/cilium/cilium/pkg/common"
"github.com/cilium/cilium/pkg/defaults"
Expand Down Expand Up @@ -74,11 +73,11 @@ func newMonitorAgent(params agentParams) ciliumagent.Agent {
}
}

monitorErr := ciliumagent.ServeMonitorAPI(ctx, agent, queueSize)
if monitorErr != nil {
log.WithError(monitorErr).Error("encountered error serving monitor agent API")
return fmt.Errorf("encountered error serving monitor agent API: %w", monitorErr)
}
// monitorErr := ciliumagent.ServeMonitorAPI(ctx, agent, queueSize)

Check failure on line 76 in pkg/monitoragent/cell_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

commentedOutCode: may want to remove commented-out code (gocritic)

Check failure on line 76 in pkg/monitoragent/cell_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

commentedOutCode: may want to remove commented-out code (gocritic)
// if monitorErr != nil {
// log.WithError(monitorErr).Error("encountered error serving monitor agent API")
// return fmt.Errorf("encountered error serving monitor agent API: %w", monitorErr)
// }
}
return err
},
Expand Down
29 changes: 28 additions & 1 deletion pkg/monitoragent/monitoragent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/cilium/cilium/api/v1/models"
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/monitor/agent/consumer"
"github.com/cilium/cilium/pkg/monitor/agent/listener"
Expand Down Expand Up @@ -64,7 +65,19 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error {
// While we want to avoid marshalling events if there are no active
// listeners, there's no need to check for active consumers ahead of time.

a.notifyAgentEvent(typ, event)
log.Info("SendEvent called")
// perfEvents are bpf datapath events (reserved for 0-128), where 0 = unspecified
if typ > 0 && typ <= 128 {
log.Info("SendEvent called with PERF event")
event, ok := event.(observerTypes.PerfEvent)
if !ok {
return fmt.Errorf("unexpected event type for perf event")

Check failure on line 74 in pkg/monitoragent/monitoragent_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

err113: do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"unexpected event type for perf event\")" (goerr113)

Check failure on line 74 in pkg/monitoragent/monitoragent_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

err113: do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"unexpected event type for perf event\")" (goerr113)
}
a.sendPerfEvent(event)
} else {
log.Info("SendEvent called with AGENT event")
a.notifyAgentEvent(typ, event)
}

// do not marshal notifications if there are no active listeners
if !a.hasListeners() {
Expand Down Expand Up @@ -98,6 +111,20 @@ func (a *monitorAgent) SendEvent(typ int, event interface{}) error {
return nil
}

func (a *monitorAgent) sendPerfEvent(event observerTypes.PerfEvent) {
a.Lock()
defer a.Unlock()
a.notifyPerfEventLocked(event.Data, event.CPU)
}

// notifyPerfEventLocked notifies all consumers about a perf event.
// The caller must hold the monitor lock.
func (a *monitorAgent) notifyPerfEventLocked(data []byte, cpu int) {
for mc := range a.consumers {
mc.NotifyPerfEvent(data, cpu)
}
}

func (a *monitorAgent) RegisterNewListener(newListener listener.MonitorListener) {
if a == nil || newListener == nil {
return
Expand Down
211 changes: 211 additions & 0 deletions pkg/plugin/cilium/cilium_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

// Package dns contains the Retina DNS plugin. It uses the Inspektor Gadget DNS tracer to capture DNS events.
package cilium

import (
"bytes"
"context"
"encoding/gob"
"errors"
"io"
"net"
"time"

v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
observerTypes "github.com/cilium/cilium/pkg/hubble/observer/types"
"github.com/cilium/cilium/pkg/monitor"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/monitor/payload"
kcfg "github.com/microsoft/retina/pkg/config"
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/plugin/api"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)

const (
MonitorSockPath1_2 = "/var/run/cilium/monitor1_2.sock"
connectionTimeout = 12
)

func New(cfg *kcfg.Config) api.Plugin {
return &cilium{
cfg: cfg,
l: log.Logger().Named(string(Name)),
}
}

func (c *cilium) Name() string {
return string(Name)
}

func (c *cilium) Generate(ctx context.Context) error {

Check failure on line 45 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 45 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
return nil
}

func (c *cilium) Compile(ctx context.Context) error {

Check failure on line 49 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 49 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
return nil
}

func (c *cilium) Init() error {
c.l.Info("Initialized cilium plugin")
return nil
}

func (c *cilium) Start(ctx context.Context) error {
if c.cfg.EnablePodLevel {
if enricher.IsInitialized() {
c.enricher = enricher.Instance()
} else {
c.l.Warn("retina enricher is not initialized")
}
}

// Start the cilium monitor
go c.monitor(ctx)

<-ctx.Done()
return nil
}

func (c *cilium) Stop() error {

c.l.Info("Stopped cilium plugin")
return nil
}

func (c *cilium) SetupChannel(ch chan *v1.Event) error {
c.externalChannel = ch
return nil
}

// Create a connection to the cilium unix socket to monitor events
func (c *cilium) monitor(ctx context.Context) {
// Start the cilium monitor
for ; ; time.Sleep(12) {

Check failure on line 88 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

mnd: Magic number: 12, in <argument> detected (gomnd)

Check failure on line 88 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

mnd: Magic number: 12, in <argument> detected (gomnd)
conn, err := net.Dial("unix", MonitorSockPath1_2)
if err != nil {
c.l.Error("Failed to connect to cilium monitor", zap.Error(err))
continue
}
c.l.Info("Connected to cilium monitor")
c.connection = conn
err = c.monitorLoop(ctx)
if err != nil {
c.l.Error("Monitor loop exited with error", zap.Error(err))
} else if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
c.l.Warn("Connection was closed")
}
}
}

// monitor events from uds connection
func (c *cilium) monitorLoop(ctx context.Context) error {
defer c.connection.Close()
decoder := gob.NewDecoder(c.connection)
var pl payload.Payload

for {
select {
case <-ctx.Done():
c.l.Info("Context done, exiting monitor loop")
return nil
default:
if err := pl.DecodeBinary(decoder); err != nil {
c.l.Error("Failed to decode payload", zap.Error(err))
return err

Check failure on line 119 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

error returned from external package is unwrapped: sig: func (*github.com/cilium/cilium/pkg/monitor/payload.Payload).DecodeBinary(dec *encoding/gob.Decoder) error (wrapcheck)

Check failure on line 119 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

error returned from external package is unwrapped: sig: func (*github.com/cilium/cilium/pkg/monitor/payload.Payload).DecodeBinary(dec *encoding/gob.Decoder) error (wrapcheck)
}
// monitorAgent.SendEvents - is used to notify for Agent Events (Access Log (proxy) and Agent Notify (cilium agent events - crud for ep, policy, svc))

// hubble monitorConsumer.sendEvent -- (NotifyPerfEvent) this func sends a monitorEvent to the consumer from hubble monitor.
// specifically, the hubble consumer adds the event to the observer's event channel

// Agent Events
// - MessageTypeAccessLog: accesslog.LogRecord
// - MessageTypeAgent: api.AgentNotify
// Perf Events
// - MessageTypeDrop: monitor.DropNotify
// - MessageTypeDebug: monitor.DebugMsg
// - MessageTypeCapture: monitor.DebugCapture
// - MessageTypeTrace: monitor.TraceNotify
// - MessageTypePolicyVerdict: monitor.PolicyVerdictNotify

c.l.Debug("Received cilium event", zap.Int("type", pl.Type), zap.Any("payload", pl.Data))
switch pl.Type {
case payload.EventSample:
data := pl.Data
messageType := data[0]
switch messageType {
// Agent Events
case monitorAPI.MessageTypeAccessLog:
buf := bytes.NewBuffer(data[1:])
dec := gob.NewDecoder(buf)
lr := monitor.LogRecordNotify{}
if err := dec.Decode(&lr); err != nil {
c.l.Error("Failed to decode log record notify", zap.Error(err))
continue
}
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: lr}
case monitorAPI.MessageTypeAgent:
buf := bytes.NewBuffer(data[1:])
dec := gob.NewDecoder(buf)
an := monitorAPI.AgentNotifyMessage{}
if err := dec.Decode(&an); err != nil {
c.l.Error("Failed to decode agent notify", zap.Error(err))
continue
}
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: an}
// Perf events
// case monitorAPI.MessageTypeDrop:
// dn := monitor.DropNotify{}
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &dn); err != nil {
// c.l.Error("Failed to decode drop notify", zap.Error(err))
// continue
// }
// c.l.Info("Drop event", zap.Any("data", dn))
// case monitorAPI.MessageTypeTrace:
// tn := monitor.TraceNotify{}
// if err := monitor.DecodeTraceNotify(data, &tn); err != nil {
// c.l.Error("Failed to decode trace notify", zap.Error(err))
// continue
// }
// c.l.Info("Trace event", zap.Any("data", tn))
// case monitorAPI.MessageTypePolicyVerdict:
// pn := monitor.PolicyVerdictNotify{}
// if err := binary.Read(bytes.NewReader(data), byteorder.Native, &pn); err != nil {
// c.l.Error("Failed to decode policy verdict notify", zap.Error(err))
// continue
// }
// c.l.Info("Policy verdict event", zap.Any("data", pn))
// case monitorAPI.MessageTypeDebug:
// c.l.Info("Debug event", zap.Any("data", data))
// case monitorAPI.MessageTypeCapture:
// c.l.Info("Capture event", zap.Any("data", data))
// case monitorAPI.MessageTypeRecCapture:
// c.l.Info("Recorder capture event", zap.Any("data", data))
// case monitorAPI.MessageTypeTraceSock:
// c.l.Info("Trace sock event", zap.Any("data", data))
// default:
// c.l.Info("Unknown event", zap.Any("data", data))
default:
ev := observerTypes.PerfEvent{
CPU: pl.CPU,
Data: pl.Data,
}
c.externalChannel <- &v1.Event{Timestamp: timestamppb.Now(), Event: ev}
}
case payload.RecordLost:
c.l.Warn("Record lost for cilium event", zap.Uint64("lost", pl.Lost))
default:
c.l.Warn("Unknown event type", zap.Int("type", pl.Type))
continue
}
// if newMA, ok := c.monitorAgent.(AgentV2); ok {

Check failure on line 206 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

commentedOutCode: may want to remove commented-out code (gocritic)

Check failure on line 206 in pkg/plugin/cilium/cilium_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

commentedOutCode: may want to remove commented-out code (gocritic)
// newMA.SendPerfEvent(record)
// }
}
}
}
1 change: 1 addition & 0 deletions pkg/plugin/cilium/cilium_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cilium
25 changes: 25 additions & 0 deletions pkg/plugin/cilium/types_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package cilium

import (
"net"

v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
kcfg "github.com/microsoft/retina/pkg/config"
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/plugin/api"
)

const (
Name api.PluginName = "cilium"
)

type cilium struct {
cfg *kcfg.Config
l *log.ZapLogger
enricher enricher.EnricherInterface
externalChannel chan *v1.Event
connection net.Conn
}
Loading

0 comments on commit 24132d6

Please sign in to comment.