Skip to content

Commit

Permalink
fix: runs now but #536 is blocking this feature
Browse files Browse the repository at this point in the history
Signed-off-by: Hunter Gregory <[email protected]>
  • Loading branch information
huntergregory committed Aug 13, 2024
1 parent d9c91a4 commit 0297485
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 26 deletions.
6 changes: 6 additions & 0 deletions cmd/hubble/cells_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
retinak8s "github.com/microsoft/retina/pkg/k8s"
"github.com/microsoft/retina/pkg/managers/pluginmanager"
"github.com/microsoft/retina/pkg/monitoragent"
"github.com/microsoft/retina/pkg/networkpolicy"
"github.com/microsoft/retina/pkg/networkpolicy/netpolagent"
"github.com/microsoft/retina/pkg/servermanager"
"github.com/microsoft/retina/pkg/shared/telemetry"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -53,6 +55,8 @@ var (
// Parse Retina specific configuration
config.Cell,

networkpolicy.Cell,

// Kubernetes client
k8sClient.Cell,

Expand All @@ -79,6 +83,8 @@ var (
// monitorAgent.Cell,
monitoragent.Cell,

netpolagent.Cell,

daemonCell,

// Provides the node reconciler
Expand Down
12 changes: 11 additions & 1 deletion pkg/hubble/parser/layer34/parser_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func (p *Parser) Decode(f *flow.Flow) *flow.Flow {
// Add TrafficDirection to flow.
p.decodeTrafficDirection(f)

p.l.WithFields(logrus.Fields{
"verdict": f.GetVerdict(),
"src": f.GetSource().GetPodName(),
"dst": f.GetDestination().GetPodName(),
}).Info("DEBUGME: Decode bottom")

p.decodePoliciesDroppingTraffic(f)

return f
Expand Down Expand Up @@ -144,7 +150,7 @@ func (p *Parser) decodePoliciesDroppingTraffic(f *flow.Flow) {
return
}

ingressPolicies, egressPolicies := p.netpolAgent.PoliciesDroppingTraffic(f.Source, f.Destination)
ingressPolicies, egressPolicies := p.netpolAgent.PoliciesDroppingTraffic(f)

// repurposing EgressAllowedBy and IngressAllowedBy to store policies DROPPING traffic.
for _, metadata := range ingressPolicies {
Expand All @@ -153,6 +159,8 @@ func (p *Parser) decodePoliciesDroppingTraffic(f *flow.Flow) {
Namespace: metadata.Namespace,
}
f.IngressAllowedBy = append(f.IngressAllowedBy, fp)

p.l.WithField("key", fmt.Sprintf("%s/%s", fp.Namespace, fp.Name)).Info("ingress policy dropping traffic")
}

for _, metadata := range egressPolicies {
Expand All @@ -161,5 +169,7 @@ func (p *Parser) decodePoliciesDroppingTraffic(f *flow.Flow) {
Namespace: metadata.Namespace,
}
f.EgressAllowedBy = append(f.EgressAllowedBy, fp)

p.l.WithField("key", fmt.Sprintf("%s/%s", fp.Namespace, fp.Name)).Info("egress policy dropping traffic")
}
}
2 changes: 1 addition & 1 deletion pkg/k8s/cell_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var Cell = cell.Module(
},
),

cell.Provide(func(lc cell.Lifecycle, cs client.Clientset, cfg *networkpolicy.Config) (resource.Resource[*slim_networkingv1.NetworkPolicy], error) {
cell.Provide(func(lc cell.Lifecycle, cs client.Clientset, cfg networkpolicy.Config) (resource.Resource[*slim_networkingv1.NetworkPolicy], error) {
if cfg.EnableNetworkPolicyEnrichment {
return ciliumk8s.NetworkPolicyResource(lc, cs)
}
Expand Down
80 changes: 70 additions & 10 deletions pkg/networkpolicy/netpolagent/netpolagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package netpolagent

import (
"context"
"fmt"
"strings"

"github.com/microsoft/retina/pkg/networkpolicy"

v1 "k8s.io/api/core/v1"
"sigs.k8s.io/network-policy-api/cmd/policy-assistant/pkg/matcher"

"github.com/cilium/cilium/api/v1/flow"
Expand Down Expand Up @@ -36,7 +38,7 @@ type agentParams struct {
Lifecycle cell.Lifecycle
Log logrus.FieldLogger
Config networkpolicy.Config
npv1 resource.Resource[*slim_networkingv1.NetworkPolicy]
NPV1 resource.Resource[*slim_networkingv1.NetworkPolicy]
}

type NetPolAgent struct {
Expand All @@ -62,7 +64,7 @@ func newNetPolAgent(p agentParams) *NetPolAgent {
n := &NetPolAgent{
l: l,
enabled: true,
npv1: p.npv1,
npv1: p.NPV1,
store: newStore(l),
wp: workerpool.New(maxWorkers),
}
Expand Down Expand Up @@ -91,24 +93,80 @@ func (n *NetPolAgent) Stop(_ cell.HookContext) error {
// PoliciesDroppingTraffic returns the policies that are causing traffic to be dropped.
// The first list is policies impacting ingress, the second list is policies impacting egress.
// Only NetworkPolicyV1 is supported currently.
func (n *NetPolAgent) PoliciesDroppingTraffic(src, dst *flow.Endpoint) ([]*PolicyMetadata, []*PolicyMetadata) {
func (n *NetPolAgent) PoliciesDroppingTraffic(f *flow.Flow) ([]*PolicyMetadata, []*PolicyMetadata) {
n.l.Info("DEBUGME PoliciesDroppingTraffic")

src, dst := f.GetSource(), f.GetDestination()

if !n.enabled || src == nil || dst == nil {
return nil, nil
}

traffic := &matcher.Traffic{
Source: endpointToTraffic(src),
Destination: endpointToTraffic(dst),
// FIXME handle port names
}

if tcp := f.GetL4().GetTCP(); tcp != nil {
traffic.Protocol = v1.ProtocolTCP
traffic.ResolvedPort = int(tcp.GetDestinationPort())
} else if udp := f.GetL4().GetUDP(); udp != nil {
traffic.Protocol = v1.ProtocolUDP
traffic.ResolvedPort = int(udp.GetDestinationPort())
} else if sctp := f.GetL4().GetSCTP(); sctp != nil {
traffic.Protocol = v1.ProtocolSCTP
traffic.ResolvedPort = int(sctp.GetDestinationPort())
} else {
n.l.Debug("unsupported protocol")
return nil, nil
}

ingress := n.policiesDroppingTraffic(traffic, true)
egress := n.policiesDroppingTraffic(traffic, false)
return ingress, egress
}

func prettyStringTraffic(t *matcher.Traffic) string {
if t == nil {
return "nil"
}
src := prettyStringInternalPeer(t.Source.Internal)
dst := prettyStringInternalPeer(t.Destination.Internal)
return fmt.Sprintf("Traffic{Source: %v, Destination: %v, Protocol: %v, ResolvedPort: %v, ResolvedPortName: %v}", src, dst, t.Protocol, t.ResolvedPort, t.ResolvedPortName)
}

func prettyStringInternalPeer(p *matcher.InternalPeer) string {
if p == nil {
return "nil"
}
return fmt.Sprintf("InternalPeer{PodLabels: %v, NamespaceLabels: %v, Namespace: %v}", p.PodLabels, p.NamespaceLabels, p.Namespace)
}

func prettyStringPolicy(p *matcher.Policy) string {
if p == nil {
return "nil"
}
return fmt.Sprintf("Policy{Ingress: %v, Egress: %v}", prettyStringTargets(p.Ingress), prettyStringTargets(p.Egress))
}

func prettyStringTargets(t map[string]*matcher.Target) string {
if t == nil {
return "nil"
}
var res []string
for k, v := range t {
prettyTarget := fmt.Sprintf("Target{Peer count: %d, SourceRules: %v}", len(v.Peers), v.SourceRules)
res = append(res, fmt.Sprintf("%v: %v", k, prettyTarget))
}
return strings.Join(res, ", ")
}

func (n *NetPolAgent) policiesDroppingTraffic(traffic *matcher.Traffic, isIngress bool) []*PolicyMetadata {
// NOTE: copied/modified from matcher.Policy.IsIngressOrEgressAllowed()

n.l.WithField("traffic", prettyStringTraffic(traffic)).Info("DEBUGME traffic")

var subject *matcher.TrafficPeer
var peer *matcher.TrafficPeer
if isIngress {
Expand All @@ -127,6 +185,7 @@ func (n *NetPolAgent) policiesDroppingTraffic(traffic *matcher.Traffic, isIngres

n.store.Lock()
matchingTargets := n.store.allPolicies.TargetsApplyingToPod(isIngress, subject.Internal)
n.l.WithField("policy", prettyStringPolicy(n.store.allPolicies)).Info("DEBUGME traffic")
n.store.Unlock()

// 2. No targets match => automatic allow
Expand Down Expand Up @@ -178,7 +237,6 @@ func (n *NetPolAgent) runNPV1Controller(ctx context.Context) error {
return nil
}

var err error
switch ev.Kind {
case resource.Sync:
// ignore
Expand All @@ -187,11 +245,7 @@ func (n *NetPolAgent) runNPV1Controller(ctx context.Context) error {
case resource.Delete:
n.store.DeleteNPV1(ev.Key)
}

if err != nil {
n.l.WithError(err).WithField("namespaceKey", ev.Key.String()).Error("error creating cilium endpoint. requeuing namespace")
}
ev.Done(err)
ev.Done(nil)
case <-ctx.Done():
n.l.Info("stop reconciling npv1")
return nil
Expand Down Expand Up @@ -228,12 +282,18 @@ func endpointToTraffic(ep *flow.Endpoint) *matcher.TrafficPeer {
continue
}

shouldIgnore := false
for _, prefix := range labelPrefixesToIgnore {
if strings.HasPrefix(lbl.Key, prefix) {
continue
shouldIgnore = true
break
}
}

if shouldIgnore {
continue
}

podLabels[lbl.Key] = lbl.Value
}

Expand Down
11 changes: 0 additions & 11 deletions pkg/networkpolicy/netpolagent/translation.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,6 @@ func convertSlimIPBlock(slim *slim_networkingv1.IPBlock) *networkingv1.IPBlock {
}
}

func convertIntOrString(slim *intstr.IntOrString) *intstr.IntOrString {
if slim == nil {
return nil
}
return &intstr.IntOrString{
Type: slim.Type,
IntVal: slim.IntVal,
StrVal: slim.StrVal,
}
}

func convertSlimPolicyTypes(slim []slim_networkingv1.PolicyType) []networkingv1.PolicyType {
if slim == nil {
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/networkpolicy/netpolagent/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func (s *store) UpsertNPV1(key resource.Key, slim *slim_networkingv1.NetworkPoli
}

func (s *store) updateNPV1(key resource.Key, slim *slim_networkingv1.NetworkPolicy) {
s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Debug("updating existing network policy")
s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Info("updating existing network policy")

// have to recalculate all policies
s.NetworkPolicies[key] = slim
s.rebuildPolicies()
}

func (s *store) addNPV1(key resource.Key, slim *slim_networkingv1.NetworkPolicy) {
s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Debug("adding new network policy")
s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Info("adding new network policy")

s.NetworkPolicies[key] = slim
npv1 := slimToNPV1(slim)[0]
Expand All @@ -74,7 +74,7 @@ func (s *store) DeleteNPV1(key resource.Key) {
return
}

s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Debug("deleting network policy")
s.l.WithField("key", fmt.Sprintf("%s/%s", key.Namespace, key.Name)).Info("deleting network policy")
delete(s.NetworkPolicies, key)

// have to recalculate all policies
Expand Down

0 comments on commit 0297485

Please sign in to comment.