Skip to content

Commit

Permalink
Merge pull request #1 from vpidatala94/user/vpidatala/POC/7
Browse files Browse the repository at this point in the history
User/vpidatala/poc/7
  • Loading branch information
vpidatala94 authored Feb 12, 2025
2 parents f574448 + bd26f51 commit ddd0893
Show file tree
Hide file tree
Showing 18 changed files with 1,887 additions and 4 deletions.
2 changes: 1 addition & 1 deletion deploy/hubble/manifests/controller/helm/retina/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ apiServer:
logLevel: info

enabledPlugin_linux: '["linuxutil","packetforward","packetparser","dns", "dropreason"]'
enabledPlugin_win: '["hnsstats"]'
enabledPlugin_win: '["hnsstats","ebpfwindows"]'

enableTelemetry: false

Expand Down
4 changes: 2 additions & 2 deletions deploy/legacy/manifests/controller/helm/retina/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ apiServer:
logLevel: debug

enabledPlugin_linux: '["dropreason","packetforward","linuxutil","dns"]'
enabledPlugin_win: '["hnsstats"]'
enabledPlugin_win: '["hnsstats", "enabledPlugin_win"]'

enableTelemetry: false

Expand Down Expand Up @@ -268,7 +268,7 @@ metrics:
tlsConfig: {}
## @param metrics.serviceMonitor.relabelings [array] Prometheus relabeling rules to apply to samples before scraping
##
relabelings:
relabelings:
- sourceLabels:
[__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
separator: ":"
Expand Down
84 changes: 84 additions & 0 deletions pkg/plugin/ebpfwindows/dropreasons_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package ebpfwindows

import (
"fmt"
)

// DropMin numbers less than this are non-drop reason codes
var DropMin uint8 = 130

// DropInvalid is the Invalid packet reason.
var DropInvalid uint8 = 2

// These values are shared with bpf/lib/common.h and api/v1/flow/flow.proto.
var dropErrors = map[uint8]string{
0: "Success",
2: "Invalid packet",
3: "Plain Text",
4: "Interface Decrypted",
5: "LB: No backend slot entry found",
6: "LB: No backend entry found",
7: "LB: Reverse entry update failed",
8: "LB: Reverse entry stale",
9: "Fragmented packet",
10: "Fragmented packet entry update failed",
11: "Missed tail call to custom program",
}

// Keep in sync with __id_for_file in bpf/lib/source_info.h.
var files = map[uint8]string{

// source files from bpf/
1: "bpf_host.c",
2: "bpf_lxc.c",
3: "bpf_overlay.c",
4: "bpf_xdp.c",
5: "bpf_sock.c",
6: "bpf_network.c",

// header files from bpf/lib/
101: "arp.h",
102: "drop.h",
103: "srv6.h",
104: "icmp6.h",
105: "nodeport.h",
106: "lb.h",
107: "mcast.h",
108: "ipv4.h",
109: "conntrack.h",
110: "l3.h",
111: "trace.h",
112: "encap.h",
113: "encrypt.h",
}

// BPFFileName returns the file name for the given BPF file id.
func BPFFileName(id uint8) string {
if name, ok := files[id]; ok {
return name
}
return fmt.Sprintf("unknown(%d)", id)
}

func extendedReason(extError int8) string {
if extError == int8(0) {
return ""
}
return fmt.Sprintf("%d", extError)
}

func DropReasonExt(reason uint8, extError int8) string {
if err, ok := dropErrors[reason]; ok {
if ext := extendedReason(extError); ext == "" {
return err
} else {
return err + ", " + ext
}
}
return fmt.Sprintf("%d, %d", reason, extError)
}

// DropReason prints the drop reason in a human readable string
func DropReason(reason uint8) string {
return DropReasonExt(reason, int8(0))
}
240 changes: 240 additions & 0 deletions pkg/plugin/ebpfwindows/ebpf_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package ebpfwindows

import (
"context"
"errors"
"time"

//"fmt"

Check failure on line 8 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 8 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

commentFormatting: put a space between `//` and comment text (gocritic)
"unsafe"

v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
observer "github.com/cilium/cilium/pkg/hubble/observer/types"
hp "github.com/cilium/cilium/pkg/hubble/parser"
kcfg "github.com/microsoft/retina/pkg/config"

//"github.com/google/uuid"

Check failure on line 16 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 16 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

commentFormatting: put a space between `//` and comment text (gocritic)
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/metrics"
"github.com/microsoft/retina/pkg/plugin/registry"

//"github.com/microsoft/retina/pkg/utils"

Check failure on line 22 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

commentFormatting: put a space between `//` and comment text (gocritic)

Check failure on line 22 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

commentFormatting: put a space between `//` and comment text (gocritic)
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)

const (
// name of the ebpfwindows plugin
name string = "windowseBPF"
// name of the metrics
packetsReceived string = "win_packets_recv_count"
packetsSent string = "win_packets_sent_count"
bytesSent string = "win_bytes_sent_count"
bytesReceived string = "win_bytes_recv_count"
droppedPacketsIncoming string = "win_packets_recv_drop_count"
droppedPacketsOutgoing string = "win_packets_sent_drop_count"
// metrics direction
ingressLabel = "ingress"
egressLabel = "egress"
)

var (
ErrInvalidEventData = errors.New("The Cilium Event Data is invalid")
ErrNilEnricher = errors.New("enricher is nil")
)

// Plugin is the ebpfwindows plugin
type Plugin struct {
l *log.ZapLogger
cfg *kcfg.Config
enricher *enricher.Enricher
externalChannel chan *v1.Event
parser *hp.Parser
}

func init() {
registry.Add(name, New)
}

func New(cfg *kcfg.Config) registry.Plugin {
return &Plugin{
l: log.Logger().Named(name),
cfg: cfg,
}
}

// Init is a no-op for the ebpfwindows plugin
func (p *Plugin) Init() error {
parser, err := hp.New(logrus.WithField("cilium", "parser"),
nil,
nil,
nil,
nil,
nil,
nil,
nil,
)
if err != nil {
p.l.Fatal("Failed to create parser", zap.Error(err))
return err
}
p.parser = parser
return nil
}

// Name returns the name of the ebpfwindows plugin
func (p *Plugin) Name() string {
return name
}

// Start the plugin by starting a periodic timer.
func (p *Plugin) Start(ctx context.Context) error {
p.l.Info("Start ebpfWindows plugin...")
p.enricher = enricher.Instance()
p.pullCiliumMetricsAndEvents(ctx)
p.l.Info("Complete ebpfWindows plugin...")
return nil
}

// metricsMapIterateCallback is the callback function that is called for each key-value pair in the metrics map.
func (p *Plugin) metricsMapIterateCallback(key *MetricsKey, value *MetricsValues) {
p.l.Debug("MetricsMapIterateCallback")
p.l.Debug("Key", zap.String("Key", key.String()))
p.l.Debug("Value", zap.String("Value", value.String()))
if key.IsDrop() {
if key.IsEgress() {
metrics.DropPacketsGauge.WithLabelValues(egressLabel).Set(float64(value.Count()))
} else if key.IsIngress() {
metrics.DropPacketsGauge.WithLabelValues(ingressLabel).Set(float64(value.Count()))
}
} else {
if key.IsEgress() {
metrics.ForwardBytesGauge.WithLabelValues(egressLabel).Set(float64(value.Bytes()))
p.l.Debug("emitting bytes sent count metric", zap.Uint64(bytesSent, value.Bytes()))
metrics.WindowsGauge.WithLabelValues(packetsSent).Set(float64(value.Count()))
p.l.Debug("emitting packets sent count metric", zap.Uint64(packetsSent, value.Count()))
} else if key.IsIngress() {
metrics.ForwardPacketsGauge.WithLabelValues(ingressLabel).Set(float64(value.Count()))
p.l.Debug("emitting packets received count metric", zap.Uint64(packetsReceived, value.Count()))
metrics.ForwardBytesGauge.WithLabelValues(ingressLabel).Set(float64(value.Bytes()))
p.l.Debug("emitting bytes received count metric", zap.Uint64(bytesReceived, value.Bytes()))
}
}
}

// eventsMapCallback is the callback function that is called for each value in the events map.
func (p *Plugin) eventsMapCallback(data unsafe.Pointer, size uint64) int {
p.l.Debug("EventsMapCallback")
p.l.Debug("Size", zap.Uint64("Size", size))
err := p.handleTraceEvent(data, size)
if err != nil {
p.l.Error("Error handling trace event", zap.Error(err))
return -1
}
return 0
}

// pullCiliumeBPFMetrics is the function that is called periodically by the timer.
func (p *Plugin) pullCiliumMetricsAndEvents(ctx context.Context) {
eventsMap := NewEventsMap()
metricsMap := NewMetricsMap()
err := eventsMap.RegisterForCallback(p.eventsMapCallback)
if err != nil {
p.l.Error("Error registering for events map callback", zap.Error(err))
return
}
ticker := time.NewTicker(p.cfg.MetricsInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := metricsMap.IterateWithCallback(p.metricsMapIterateCallback)
if err != nil {
p.l.Error("Error iterating metrics map", zap.Error(err))
}
case <-ctx.Done():
p.l.Error("ebpfwindows plugin canceling", zap.Error(ctx.Err()))
eventsMap.UnregisterForCallback()

Check failure on line 158 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, amd64)

Error return value of `eventsMap.UnregisterForCallback` is not checked (errcheck)

Check failure on line 158 in pkg/plugin/ebpfwindows/ebpf_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

Error return value of `eventsMap.UnregisterForCallback` is not checked (errcheck)
return
}
}
}

// SetupChannel saves the external channel to which the plugin will send events.
func (p *Plugin) SetupChannel(ch chan *v1.Event) error {
p.externalChannel = ch
return nil
}

// Stop the plugin by cancelling the periodic timer.
func (p *Plugin) Stop() error {
p.l.Info("Stop ebpfWindows plugin...")
return nil
}

// Compile is a no-op for the ebpfwindows plugin
func (p *Plugin) Compile(context.Context) error {
return nil
}

// Generate is a no-op for the ebpfwindows plugin
func (p *Plugin) Generate(context.Context) error {
return nil
}

func (p *Plugin) handleTraceEvent(data unsafe.Pointer, size uint64) error {
if uintptr(size) < unsafe.Sizeof(uint8(0)) {
return ErrInvalidEventData
}
eventType := *(*uint8)(data)
switch eventType {
case CiliumNotifyDrop:
if uintptr(size) < unsafe.Sizeof(DropNotify{}) {
p.l.Error("Invalid DropNotify data size", zap.Uint64("size", size))
return ErrInvalidEventData
}
e, err := p.parser.Decode(&observer.MonitorEvent{
Payload: &observer.PerfEvent{
Data: (*[unsafe.Sizeof(DropNotify{})]byte)(data)[:],
},
})
if err != nil {
p.l.Error("Could not convert event to flow", zap.Any("handleTraceEvent", data), zap.Error(err))
return ErrInvalidEventData
}
p.enricher.Write(e)
case CiliumNotifyTrace:
if uintptr(size) < unsafe.Sizeof(TraceNotify{}) {
p.l.Error("Invalid TraceNotify data size", zap.Uint64("size", size))
return ErrInvalidEventData
}
e, err := p.parser.Decode(&observer.MonitorEvent{
Payload: &observer.PerfEvent{
Data: (*[unsafe.Sizeof(TraceNotify{})]byte)(data)[:],
},
})
if err != nil {
p.l.Error("Could not convert event to flow", zap.Any("handleTraceEvent", data), zap.Error(err))
return ErrInvalidEventData
}

p.enricher.Write(e)
case CiliumNotifyTraceSock:
if uintptr(size) < unsafe.Sizeof(TraceSockNotify{}) {
p.l.Error("Invalid TraceSockNotify data size", zap.Uint64("size", size))
return ErrInvalidEventData
}
e, err := p.parser.Decode(&observer.MonitorEvent{
Payload: &observer.PerfEvent{
Data: (*[unsafe.Sizeof(TraceSockNotify{})]byte)(data)[:],
},
})
if err != nil {
p.l.Error("Could not convert event to flow", zap.Any("handleTraceEvent", data), zap.Error(err))
return ErrInvalidEventData
}
p.enricher.Write(e)
}
return nil
}
Loading

0 comments on commit ddd0893

Please sign in to comment.