From 0297485740f24cc6259cc803033efe63ca7348bc Mon Sep 17 00:00:00 2001 From: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> Date: Tue, 13 Aug 2024 10:45:51 -0700 Subject: [PATCH] fix: runs now but #536 is blocking this feature Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com> --- cmd/hubble/cells_linux.go | 6 ++ pkg/hubble/parser/layer34/parser_linux.go | 12 ++- pkg/k8s/cell_linux.go | 2 +- pkg/networkpolicy/netpolagent/netpolagent.go | 80 +++++++++++++++++--- pkg/networkpolicy/netpolagent/translation.go | 11 --- pkg/networkpolicy/netpolagent/types.go | 6 +- 6 files changed, 91 insertions(+), 26 deletions(-) diff --git a/cmd/hubble/cells_linux.go b/cmd/hubble/cells_linux.go index 90b64d41dd..b9c45cbf04 100644 --- a/cmd/hubble/cells_linux.go +++ b/cmd/hubble/cells_linux.go @@ -20,6 +20,8 @@ import ( retinak8s "github.com/microsoft/retina/pkg/k8s" "github.com/microsoft/retina/pkg/managers/pluginmanager" "github.com/microsoft/retina/pkg/monitoragent" + "github.com/microsoft/retina/pkg/networkpolicy" + "github.com/microsoft/retina/pkg/networkpolicy/netpolagent" "github.com/microsoft/retina/pkg/servermanager" "github.com/microsoft/retina/pkg/shared/telemetry" "k8s.io/client-go/rest" @@ -53,6 +55,8 @@ var ( // Parse Retina specific configuration config.Cell, + networkpolicy.Cell, + // Kubernetes client k8sClient.Cell, @@ -79,6 +83,8 @@ var ( // monitorAgent.Cell, monitoragent.Cell, + netpolagent.Cell, + daemonCell, // Provides the node reconciler diff --git a/pkg/hubble/parser/layer34/parser_linux.go b/pkg/hubble/parser/layer34/parser_linux.go index 7ad09ade9d..0c225b7a94 100644 --- a/pkg/hubble/parser/layer34/parser_linux.go +++ b/pkg/hubble/parser/layer34/parser_linux.go @@ -63,6 +63,12 @@ func (p *Parser) Decode(f *flow.Flow) *flow.Flow { // Add TrafficDirection to flow. p.decodeTrafficDirection(f) + p.l.WithFields(logrus.Fields{ + "verdict": f.GetVerdict(), + "src": f.GetSource().GetPodName(), + "dst": f.GetDestination().GetPodName(), + }).Info("DEBUGME: Decode bottom") + p.decodePoliciesDroppingTraffic(f) return f @@ -144,7 +150,7 @@ func (p *Parser) decodePoliciesDroppingTraffic(f *flow.Flow) { return } - ingressPolicies, egressPolicies := p.netpolAgent.PoliciesDroppingTraffic(f.Source, f.Destination) + ingressPolicies, egressPolicies := p.netpolAgent.PoliciesDroppingTraffic(f) // repurposing EgressAllowedBy and IngressAllowedBy to store policies DROPPING traffic. for _, metadata := range ingressPolicies { @@ -153,6 +159,8 @@ func (p *Parser) decodePoliciesDroppingTraffic(f *flow.Flow) { Namespace: metadata.Namespace, } f.IngressAllowedBy = append(f.IngressAllowedBy, fp) + + p.l.WithField("key", fmt.Sprintf("%s/%s", fp.Namespace, fp.Name)).Info("ingress policy dropping traffic") } for _, metadata := range egressPolicies { @@ -161,5 +169,7 @@ func (p *Parser) decodePoliciesDroppingTraffic(f *flow.Flow) { Namespace: metadata.Namespace, } f.EgressAllowedBy = append(f.EgressAllowedBy, fp) + + p.l.WithField("key", fmt.Sprintf("%s/%s", fp.Namespace, fp.Name)).Info("egress policy dropping traffic") } } diff --git a/pkg/k8s/cell_linux.go b/pkg/k8s/cell_linux.go index 72f90a8b57..ea7b8a140b 100644 --- a/pkg/k8s/cell_linux.go +++ b/pkg/k8s/cell_linux.go @@ -72,7 +72,7 @@ var Cell = cell.Module( }, ), - cell.Provide(func(lc cell.Lifecycle, cs client.Clientset, cfg *networkpolicy.Config) (resource.Resource[*slim_networkingv1.NetworkPolicy], error) { + cell.Provide(func(lc cell.Lifecycle, cs client.Clientset, cfg networkpolicy.Config) (resource.Resource[*slim_networkingv1.NetworkPolicy], error) { if cfg.EnableNetworkPolicyEnrichment { return ciliumk8s.NetworkPolicyResource(lc, cs) } diff --git a/pkg/networkpolicy/netpolagent/netpolagent.go b/pkg/networkpolicy/netpolagent/netpolagent.go index 246bcac3af..7c77dc1353 100644 --- a/pkg/networkpolicy/netpolagent/netpolagent.go +++ b/pkg/networkpolicy/netpolagent/netpolagent.go @@ -2,10 +2,12 @@ package netpolagent import ( "context" + "fmt" "strings" "github.com/microsoft/retina/pkg/networkpolicy" + v1 "k8s.io/api/core/v1" "sigs.k8s.io/network-policy-api/cmd/policy-assistant/pkg/matcher" "github.com/cilium/cilium/api/v1/flow" @@ -36,7 +38,7 @@ type agentParams struct { Lifecycle cell.Lifecycle Log logrus.FieldLogger Config networkpolicy.Config - npv1 resource.Resource[*slim_networkingv1.NetworkPolicy] + NPV1 resource.Resource[*slim_networkingv1.NetworkPolicy] } type NetPolAgent struct { @@ -62,7 +64,7 @@ func newNetPolAgent(p agentParams) *NetPolAgent { n := &NetPolAgent{ l: l, enabled: true, - npv1: p.npv1, + npv1: p.NPV1, store: newStore(l), wp: workerpool.New(maxWorkers), } @@ -91,7 +93,11 @@ func (n *NetPolAgent) Stop(_ cell.HookContext) error { // PoliciesDroppingTraffic returns the policies that are causing traffic to be dropped. // The first list is policies impacting ingress, the second list is policies impacting egress. // Only NetworkPolicyV1 is supported currently. -func (n *NetPolAgent) PoliciesDroppingTraffic(src, dst *flow.Endpoint) ([]*PolicyMetadata, []*PolicyMetadata) { +func (n *NetPolAgent) PoliciesDroppingTraffic(f *flow.Flow) ([]*PolicyMetadata, []*PolicyMetadata) { + n.l.Info("DEBUGME PoliciesDroppingTraffic") + + src, dst := f.GetSource(), f.GetDestination() + if !n.enabled || src == nil || dst == nil { return nil, nil } @@ -99,6 +105,21 @@ func (n *NetPolAgent) PoliciesDroppingTraffic(src, dst *flow.Endpoint) ([]*Polic traffic := &matcher.Traffic{ Source: endpointToTraffic(src), Destination: endpointToTraffic(dst), + // FIXME handle port names + } + + if tcp := f.GetL4().GetTCP(); tcp != nil { + traffic.Protocol = v1.ProtocolTCP + traffic.ResolvedPort = int(tcp.GetDestinationPort()) + } else if udp := f.GetL4().GetUDP(); udp != nil { + traffic.Protocol = v1.ProtocolUDP + traffic.ResolvedPort = int(udp.GetDestinationPort()) + } else if sctp := f.GetL4().GetSCTP(); sctp != nil { + traffic.Protocol = v1.ProtocolSCTP + traffic.ResolvedPort = int(sctp.GetDestinationPort()) + } else { + n.l.Debug("unsupported protocol") + return nil, nil } ingress := n.policiesDroppingTraffic(traffic, true) @@ -106,9 +127,46 @@ func (n *NetPolAgent) PoliciesDroppingTraffic(src, dst *flow.Endpoint) ([]*Polic return ingress, egress } +func prettyStringTraffic(t *matcher.Traffic) string { + if t == nil { + return "nil" + } + src := prettyStringInternalPeer(t.Source.Internal) + dst := prettyStringInternalPeer(t.Destination.Internal) + return fmt.Sprintf("Traffic{Source: %v, Destination: %v, Protocol: %v, ResolvedPort: %v, ResolvedPortName: %v}", src, dst, t.Protocol, t.ResolvedPort, t.ResolvedPortName) +} + +func prettyStringInternalPeer(p *matcher.InternalPeer) string { + if p == nil { + return "nil" + } + return fmt.Sprintf("InternalPeer{PodLabels: %v, NamespaceLabels: %v, Namespace: %v}", p.PodLabels, p.NamespaceLabels, p.Namespace) +} + +func prettyStringPolicy(p *matcher.Policy) string { + if p == nil { + return "nil" + } + return fmt.Sprintf("Policy{Ingress: %v, Egress: %v}", prettyStringTargets(p.Ingress), prettyStringTargets(p.Egress)) +} + +func prettyStringTargets(t map[string]*matcher.Target) string { + if t == nil { + return "nil" + } + var res []string + for k, v := range t { + prettyTarget := fmt.Sprintf("Target{Peer count: %d, SourceRules: %v}", len(v.Peers), v.SourceRules) + res = append(res, fmt.Sprintf("%v: %v", k, prettyTarget)) + } + return strings.Join(res, ", ") +} + func (n *NetPolAgent) policiesDroppingTraffic(traffic *matcher.Traffic, isIngress bool) []*PolicyMetadata { // NOTE: copied/modified from matcher.Policy.IsIngressOrEgressAllowed() + n.l.WithField("traffic", prettyStringTraffic(traffic)).Info("DEBUGME traffic") + var subject *matcher.TrafficPeer var peer *matcher.TrafficPeer if isIngress { @@ -127,6 +185,7 @@ func (n *NetPolAgent) policiesDroppingTraffic(traffic *matcher.Traffic, isIngres n.store.Lock() matchingTargets := n.store.allPolicies.TargetsApplyingToPod(isIngress, subject.Internal) + n.l.WithField("policy", prettyStringPolicy(n.store.allPolicies)).Info("DEBUGME traffic") n.store.Unlock() // 2. No targets match => automatic allow @@ -178,7 +237,6 @@ func (n *NetPolAgent) runNPV1Controller(ctx context.Context) error { return nil } - var err error switch ev.Kind { case resource.Sync: // ignore @@ -187,11 +245,7 @@ func (n *NetPolAgent) runNPV1Controller(ctx context.Context) error { case resource.Delete: n.store.DeleteNPV1(ev.Key) } - - if err != nil { - n.l.WithError(err).WithField("namespaceKey", ev.Key.String()).Error("error creating cilium endpoint. requeuing namespace") - } - ev.Done(err) + ev.Done(nil) case <-ctx.Done(): n.l.Info("stop reconciling npv1") return nil @@ -228,12 +282,18 @@ func endpointToTraffic(ep *flow.Endpoint) *matcher.TrafficPeer { continue } + shouldIgnore := false for _, prefix := range labelPrefixesToIgnore { if strings.HasPrefix(lbl.Key, prefix) { - continue + shouldIgnore = true + break } } + if shouldIgnore { + continue + } + podLabels[lbl.Key] = lbl.Value } diff --git a/pkg/networkpolicy/netpolagent/translation.go b/pkg/networkpolicy/netpolagent/translation.go index 8b66d85a03..a50c1b855f 100644 --- a/pkg/networkpolicy/netpolagent/translation.go +++ b/pkg/networkpolicy/netpolagent/translation.go @@ -143,17 +143,6 @@ func convertSlimIPBlock(slim *slim_networkingv1.IPBlock) *networkingv1.IPBlock { } } -func convertIntOrString(slim *intstr.IntOrString) *intstr.IntOrString { - if slim == nil { - return nil - } - return &intstr.IntOrString{ - Type: slim.Type, - IntVal: slim.IntVal, - StrVal: slim.StrVal, - } -} - func convertSlimPolicyTypes(slim []slim_networkingv1.PolicyType) []networkingv1.PolicyType { if slim == nil { return nil diff --git a/pkg/networkpolicy/netpolagent/types.go b/pkg/networkpolicy/netpolagent/types.go index ca65563cbd..23f74f7f9f 100644 --- a/pkg/networkpolicy/netpolagent/types.go +++ b/pkg/networkpolicy/netpolagent/types.go @@ -49,7 +49,7 @@ func (s *store) UpsertNPV1(key resource.Key, slim *slim_networkingv1.NetworkPoli } func (s *store) updateNPV1(key resource.Key, slim *slim_networkingv1.NetworkPolicy) { - s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Debug("updating existing network policy") + s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Info("updating existing network policy") // have to recalculate all policies s.NetworkPolicies[key] = slim @@ -57,7 +57,7 @@ func (s *store) updateNPV1(key resource.Key, slim *slim_networkingv1.NetworkPoli } func (s *store) addNPV1(key resource.Key, slim *slim_networkingv1.NetworkPolicy) { - s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Debug("adding new network policy") + s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Info("adding new network policy") s.NetworkPolicies[key] = slim npv1 := slimToNPV1(slim)[0] @@ -74,7 +74,7 @@ func (s *store) DeleteNPV1(key resource.Key) { return } - s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Debug("deleting network policy") + s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Info("deleting network policy") delete(s.NetworkPolicies, key) // have to recalculate all policies