Skip to content

Commit

Permalink
fix: update cilium dependency and refactor service decoding logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ritwikranjan committed Jan 14, 2025
1 parent de8a4c6 commit 6f3ce2f
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 58 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,5 @@ replace github.com/vishvananda/netns => github.com/inspektor-gadget/netns v0.0.5
replace k8s.io/perf-tests/network/benchmarks/netperf => github.com/Azure/perf-tests/network/benchmarks/netperf v0.0.0-20241008140716-395a79947d2c

replace go.universe.tf/metallb => github.com/cilium/metallb v0.1.1-0.20220829170633-5d7dfb1129f7

replace github.com/cilium/cilium => github.com/ritwikranjan/cilium v0.0.0-20250114123205-3b4fd6e1c18d
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ github.com/chai2010/gettext-go v1.0.2/go.mod h1:y+wnP2cHYaVj19NZhYKAwEMH2CI1gNHe
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/cilium v1.16.5 h1:ecjhh98fl6Ki641+8Cdb0oynsy3toQ+oPLCSI3d+KLE=
github.com/cilium/cilium v1.16.5/go.mod h1:EqOosPzJuv28Hz3Ulz6cCXfYKbll7vbIwMGZU5houOw=
github.com/cilium/dns v1.1.51-0.20240603182237-af788769786a h1:PRGN7B+72mj3OtLL2DM3F/9jp+ItgqgNS7mecgCmwsQ=
github.com/cilium/dns v1.1.51-0.20240603182237-af788769786a/go.mod h1:/7LC2GOgyXJ7maupZlaVIumYQiGPIgllSf6mA9sg6RU=
github.com/cilium/ebpf v0.5.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs=
Expand Down Expand Up @@ -813,6 +811,8 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/ritwikranjan/cilium v0.0.0-20250114123205-3b4fd6e1c18d h1:a0b9axv+dMplVejBUc1vp9gLkycvzfNo3a8fhwmm/FQ=
github.com/ritwikranjan/cilium v0.0.0-20250114123205-3b4fd6e1c18d/go.mod h1:EqOosPzJuv28Hz3Ulz6cCXfYKbll7vbIwMGZU5houOw=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down
75 changes: 38 additions & 37 deletions pkg/hubble/common/decoder_linux.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package common

import (
"net"
"net/netip"
"os"

"github.com/cilium/cilium/api/v1/flow"
"github.com/cilium/cilium/pkg/identity"
ipc "github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/k8s"
)

//go:generate go run github.com/golang/mock/[email protected] -source decoder.go -destination=mocks/mock_types.go -package=mocks
Expand Down Expand Up @@ -43,19 +44,19 @@ func (e *epDecoder) Decode(ip netip.Addr) *flow.Endpoint {
ep.ID = id.ID.Uint32()
ep.Identity = id.ID.Uint32()

switch id.ID { //nolint:exhaustive // We don't need all the cases.
case identity.ReservedIdentityHost:
ep.Labels = labels.LabelHost.GetModel()
case identity.ReservedIdentityKubeAPIServer:
ep.Labels = labels.LabelKubeAPIServer.GetModel()
case identity.ReservedIdentityRemoteNode:
ep.Labels = labels.LabelRemoteNode.GetModel()
case identity.ReservedIdentityWorld:
ep.Labels = labels.LabelWorld.GetModel()
default:
// https://github.com/cilium/cilium/commit/ba2eb6cc1da6b894c7bea23b31ed6a33292ba951#diff-bb1165de1a65911d59ac7f9a002aa9f1e25c30e073547adec11b174a174fdca6L192
// ep.Labels = e.ipcache.GetMetadataLabelsByIP(ip).GetModel()
}
// switch id.ID { //nolint:exhaustive // We don't need all the cases.
// case identity.ReservedIdentityHost:
// ep.Labels = labels.LabelHost.GetModel()
// case identity.ReservedIdentityKubeAPIServer:
// ep.Labels = labels.LabelKubeAPIServer.GetModel()
// case identity.ReservedIdentityRemoteNode:
// ep.Labels = labels.LabelRemoteNode.GetModel()
// case identity.ReservedIdentityWorld:
// ep.Labels = labels.LabelWorld.GetModel()
// default:
// // https://github.com/cilium/cilium/commit/ba2eb6cc1da6b894c7bea23b31ed6a33292ba951#diff-bb1165de1a65911d59ac7f9a002aa9f1e25c30e073547adec11b174a174fdca6L192
// // ep.Labels = e.ipcache.GetMetadataLabelsByIP(ip).GetModel()
// }

return ep
}
Expand All @@ -69,26 +70,26 @@ func (e *epDecoder) IsEndpointOnLocalHost(ip string) bool {
return e.localHostIP == e.endpointHostIP(ip)
}

// type SvcDecoder interface {
// Decode(ip netip.Addr) *flow.Service
// }
//
// type svcDecoder struct {
// svccache *k8s.ServiceCache
// }
//
// func NewSvcDecoder(sc *k8s.ServiceCache) *svcDecoder {
// return &svcDecoder{
// svccache: sc,
// }
// }
//
// func (s *svcDecoder) Decode(ip netip.Addr) *flow.Service {
// svc := &flow.Service{}
//
// if svcID, ok := s.svccache.GetServiceIDFromFrontendIP(ip.String()); ok {
// svc.Name = svcID.Name
// svc.Namespace = svcID.Namespace
// }
// return svc
// }
type SvcDecoder interface {
Decode(ip netip.Addr) *flow.Service
}

type svcDecoder struct {
svccache *k8s.ServiceCache
}

func NewSvcDecoder(sc *k8s.ServiceCache) *svcDecoder {

Check failure on line 81 in pkg/hubble/common/decoder_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, amd64)

unexported-return: exported func NewSvcDecoder returns unexported type *common.svcDecoder, which can be annoying to use (revive)

Check failure on line 81 in pkg/hubble/common/decoder_linux.go

View workflow job for this annotation

GitHub Actions / Lint (linux, arm64)

unexported-return: exported func NewSvcDecoder returns unexported type *common.svcDecoder, which can be annoying to use (revive)
return &svcDecoder{
svccache: sc,
}
}

func (s *svcDecoder) Decode(ip netip.Addr) *flow.Service {
svc := &flow.Service{}

if svcID, ok := s.svccache.GetServiceIDByIP(net.IP(ip.String())); ok {
svc.Name = svcID.Name
svc.Namespace = svcID.Namespace
}
return svc
}
6 changes: 5 additions & 1 deletion pkg/hubble/hubble_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cilium/cilium/pkg/hubble/server"
"github.com/cilium/cilium/pkg/hubble/server/serveroption"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/logging/logfields"
monitoragent "github.com/cilium/cilium/pkg/monitor/agent"
"github.com/cilium/cilium/pkg/option"
Expand All @@ -34,6 +35,7 @@ type RetinaHubble struct {
log *logrus.Entry
client client.Client
monitorAgent monitoragent.Agent
svc *k8s.ServiceCache
ipc *ipcache.IPCache
nodeReconciler *rnode.NodeReconciler
}
Expand All @@ -43,6 +45,7 @@ type hubbleParams struct {

Client client.Client
MonitorAgent monitoragent.Agent
ServiceCache *k8s.ServiceCache
IPCache *ipcache.IPCache
NodeReconciler *rnode.NodeReconciler
Log logrus.FieldLogger
Expand All @@ -53,6 +56,7 @@ func newRetinaHubble(params hubbleParams) *RetinaHubble {
log: params.Log.WithField(logfields.LogSubsys, "retina-hubble"),
client: params.Client,
monitorAgent: params.MonitorAgent,
svc: params.ServiceCache,
ipc: params.IPCache,
nodeReconciler: params.NodeReconciler,
}
Expand Down Expand Up @@ -115,7 +119,7 @@ func (rh *RetinaHubble) start(ctx context.Context) error {
)

// TODO: Replace with our custom parser.
payloadParser := parser.New(rh.log, rh.ipc)
payloadParser := parser.New(rh.log, rh.svc, rh.ipc)

namespaceManager := observer.NewNamespaceManager()
go namespaceManager.Run(ctx)
Expand Down
18 changes: 12 additions & 6 deletions pkg/hubble/parser/layer34/parser_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@ import (
"net/netip"

"github.com/cilium/cilium/api/v1/flow"
"github.com/cilium/cilium/pkg/ipcache"
ipc "github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"

"github.com/microsoft/retina/pkg/hubble/common"
"github.com/microsoft/retina/pkg/utils"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)

type Parser struct {
l *logrus.Entry
ep common.EpDecoder
l *logrus.Entry
svd common.SvcDecoder
ep common.EpDecoder
}

func New(l *logrus.Entry, c *ipcache.IPCache) *Parser {
func New(l *logrus.Entry, svc *k8s.ServiceCache, c *ipc.IPCache) *Parser {
p := &Parser{
l: l.WithField("subsys", "layer34"),
ep: common.NewEpDecoder(c),
l: l.WithField("subsys", "layer34"),
svd: common.NewSvcDecoder(svc),
ep: common.NewEpDecoder(c),
}
// Log the localHostIP for debugging purposes.
return p
Expand Down Expand Up @@ -49,6 +53,8 @@ func (p *Parser) Decode(f *flow.Flow) *flow.Flow {
// Decode the flow's source and destination IPs to their respective endpoints.
f.Source = p.ep.Decode(sourceIP)
f.Destination = p.ep.Decode(destIP)
f.SourceService = p.svd.Decode(sourceIP)
f.DestinationService = p.svd.Decode(destIP)

// Add L34 Summary to flow.
p.decodeSummary(f)
Expand Down
9 changes: 6 additions & 3 deletions pkg/hubble/parser/parser_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
observer "github.com/cilium/cilium/pkg/hubble/observer/types"
ipc "github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
"github.com/microsoft/retina/pkg/hubble/parser/layer34"
"github.com/microsoft/retina/pkg/hubble/parser/seven"
"github.com/sirupsen/logrus"
Expand All @@ -24,18 +25,20 @@ var (
type Parser struct {
l logrus.FieldLogger
ipcache *ipc.IPCache
svc *k8s.ServiceCache

l34 *layer34.Parser
l7 *seven.Parser
}

func New(l *logrus.Entry, c *ipc.IPCache) *Parser {
func New(l *logrus.Entry, svc *k8s.ServiceCache, c *ipc.IPCache) *Parser {
return &Parser{
l: l,
ipcache: c,
svc: svc,

l34: layer34.New(l, c),
l7: seven.New(l, c),
l34: layer34.New(l, svc, c),
l7: seven.New(l, svc, c),
}
}

Expand Down
21 changes: 13 additions & 8 deletions pkg/hubble/parser/seven/parser_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@ import (
"strings"

"github.com/cilium/cilium/api/v1/flow"
"github.com/cilium/cilium/pkg/ipcache"
ipc "github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
"github.com/google/gopacket/layers"
"github.com/microsoft/retina/pkg/hubble/common"
"github.com/sirupsen/logrus"
"go.uber.org/zap"
)

type Parser struct {
l *logrus.Entry
ep common.EpDecoder
l *logrus.Entry
svd common.SvcDecoder
ep common.EpDecoder
}

func New(l *logrus.Entry, c *ipcache.IPCache) *Parser {
func New(l *logrus.Entry, svc *k8s.ServiceCache, c *ipc.IPCache) *Parser {
return &Parser{
l: l.WithField("subsys", "seven"),
ep: common.NewEpDecoder(c),
l: l.WithField("subsys", "seven"),
svd: common.NewSvcDecoder(svc),
ep: common.NewEpDecoder(c),
}
}

Expand All @@ -30,7 +33,7 @@ func (p *Parser) Decode(f *flow.Flow) *flow.Flow {
return nil
}

// Decode the flow's IP addresses to their respective endpoints.
// Decode the flow's IP addresses to their respective service.
p.decodeIP(f)

// Decode the flow's L7 protocol.
Expand Down Expand Up @@ -58,7 +61,7 @@ func (p *Parser) decodeIP(f *flow.Flow) {
return
}

// Decode the flow's source and destination IPs to their respective endpoints.
// Decode the flow's source and destination IPs to their respective service.
if f.GetIP() == nil {
p.l.Warn("Failed to get IP from flow", zap.Any("flow", f))
return
Expand All @@ -76,6 +79,8 @@ func (p *Parser) decodeIP(f *flow.Flow) {

f.Source = p.ep.Decode(sourceIP)
f.Destination = p.ep.Decode(destIP)
f.SourceService = p.svd.Decode(sourceIP)
f.DestinationService = p.svd.Decode(destIP)
}

func (p *Parser) decodeDNS(f *flow.Flow) *flow.Flow {
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8s/watcher_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Start(ctx context.Context, k *watchers.K8sWatcher) {

option.Config.K8sSyncTimeout = 3 * time.Minute //nolint:gomnd // this duration is self-explanatory
syncdCache := make(chan struct{})
go k.InitK8sSubsystem(ctx, syncdCache)
go k.InitK8sSubsystemWithResources(ctx, syncdCache, k8sResources)
logger.WithField("k8s resources", k8sResources).Info("Kubernetes watcher started, will wait for cache sync")

// Wait for K8s watcher to sync. If doesn't complete in 3 minutes, causes fatal error.
Expand Down

0 comments on commit 6f3ce2f

Please sign in to comment.