From 15c0da53b5adbdaa3366456f6827499374adc5d5 Mon Sep 17 00:00:00 2001 From: Quang Nguyen Date: Fri, 6 Sep 2024 17:09:16 -0400 Subject: [PATCH] feat(conntrack): conntrack integration with packetparser (#624) # Description Part 2 of https://github.com/microsoft/retina/pull/610 ## Related Issue If this pull request is related to any issue, please mention it here. Additionally, make sure that the issue is assigned to you before submitting this pull request. ## Checklist - [ ] I have read the [contributing documentation](https://retina.sh/docs/contributing). - [ ] I signed and signed-off the commits (`git commit -S -s ...`). See [this documentation](https://docs.github.com/en/authentication/managing-commit-signature-verification/about-commit-signature-verification) on signing commits. - [ ] I have correctly attributed the author(s) of the code. - [ ] I have tested the changes locally. - [ ] I have followed the project's style guidelines. - [ ] I have updated the documentation, if necessary. - [ ] I have added tests, if applicable. ## Screenshots (if applicable) or Testing Completed Output from debug CLI tool: ![image](https://github.com/user-attachments/assets/4798f877-7931-4d44-8d1f-ca60c4ceda3f) Hubble flow logs: ![image](https://github.com/user-attachments/assets/10dff07f-24cc-4587-b18f-28f748fa0c33) ## Additional Notes Add any additional notes or context about the pull request here. --- Please refer to the [CONTRIBUTING.md](../CONTRIBUTING.md) file for more information on how to contribute to this project. --------- Signed-off-by: Quang Nguyen --- pkg/bpf/setup_linux.go | 15 ++ pkg/hubble/parser/layer34/parser_linux.go | 51 ------ pkg/managers/pluginmanager/pluginmanager.go | 13 +- .../pluginmanager/pluginmanager_test.go | 10 +- pkg/module/metrics/latency_test.go | 4 +- pkg/plugin/common/constants.go | 2 + pkg/plugin/conntrack/_cprog/conntrack.c | 140 ++++++++-------- pkg/plugin/conntrack/_cprog/conntrack.h | 2 +- pkg/plugin/conntrack/conntrack_linux.go | 50 ++++-- pkg/plugin/conntrack/conntrack_windows.go | 6 +- pkg/plugin/conntrack/types_linux.go | 2 - pkg/plugin/dns/dns_linux.go | 3 +- pkg/plugin/dropreason/dropreason_linux.go | 4 + pkg/plugin/dropreason/kprobe_bpfel_arm64.go | 2 +- pkg/plugin/dropreason/kprobe_bpfel_x86.go | 2 +- pkg/plugin/packetparser/_cprog/dynamic.h | 4 +- pkg/plugin/packetparser/_cprog/packetparser.c | 152 ++++++++---------- pkg/plugin/packetparser/_cprog/packetparser.h | 9 +- .../packetparser/packetparser_bpfel_arm64.go | 41 +++-- .../packetparser/packetparser_bpfel_x86.go | 42 +++-- pkg/plugin/packetparser/packetparser_linux.go | 60 +++++-- .../packetparser/packetparser_linux_test.go | 14 +- pkg/plugin/packetparser/types_linux.go | 11 ++ pkg/plugin/tcpretrans/tcpretrans_linux.go | 1 + pkg/utils/flow_utils.go | 37 ++--- pkg/utils/metadata_linux.pb.go | 8 +- pkg/utils/metadata_windows.pb.go | 38 +---- pkg/utils/utils_linux_test.go | 22 +-- 28 files changed, 399 insertions(+), 346 deletions(-) diff --git a/pkg/bpf/setup_linux.go b/pkg/bpf/setup_linux.go index f7a2fb1e0d..a04cbdf172 100644 --- a/pkg/bpf/setup_linux.go +++ b/pkg/bpf/setup_linux.go @@ -9,6 +9,7 @@ import ( "github.com/cilium/cilium/pkg/mountinfo" plugincommon "github.com/microsoft/retina/pkg/plugin/common" + "github.com/microsoft/retina/pkg/plugin/conntrack" "github.com/microsoft/retina/pkg/plugin/filter" "github.com/pkg/errors" "go.uber.org/zap" @@ -75,5 +76,19 @@ func Setup(l *zap.Logger) error { return errors.Wrap(err, "failed to initialize filter map") } l.Info("Filter map initialized successfully", zap.String("path", plugincommon.MapPath), zap.String("Map name", plugincommon.FilterMapName)) + + // Delete existing conntrack map file. + err = os.Remove(plugincommon.MapPath + "/" + plugincommon.ConntrackMapName) + if err != nil && !os.IsNotExist(err) { + return errors.Wrap(err, "failed to delete existing conntrack map file") + } + l.Info("Deleted existing conntrack map file", zap.String("path", plugincommon.MapPath), zap.String("Map name", plugincommon.ConntrackMapName)) + // Initialize the conntrack map. + // This will create the conntrack map in kernel and pin it to /sys/fs/bpf. + err = conntrack.Init() + if err != nil { + return errors.Wrap(err, "failed to initialize conntrack map") + } + l.Info("Conntrack map initialized successfully", zap.String("path", plugincommon.MapPath), zap.String("Map name", plugincommon.ConntrackMapName)) return nil } diff --git a/pkg/hubble/parser/layer34/parser_linux.go b/pkg/hubble/parser/layer34/parser_linux.go index c094cddf55..f66ae9fe23 100644 --- a/pkg/hubble/parser/layer34/parser_linux.go +++ b/pkg/hubble/parser/layer34/parser_linux.go @@ -10,7 +10,6 @@ import ( "github.com/microsoft/retina/pkg/utils" "github.com/sirupsen/logrus" "go.uber.org/zap" - "google.golang.org/protobuf/types/known/wrapperspb" ) type Parser struct { @@ -51,15 +50,9 @@ func (p *Parser) Decode(f *flow.Flow) *flow.Flow { f.Source = p.ep.Decode(sourceIP) f.Destination = p.ep.Decode(destIP) - // Add IsReply to flow. - p.decodeIsReply(f) - // Add L34 Summary to flow. p.decodeSummary(f) - // Add TrafficDirection to flow. - p.decodeTrafficDirection(f) - return f } @@ -89,47 +82,3 @@ func (p *Parser) decodeSummary(f *flow.Flow) { } } } - -// decodeIsReply sets the flow's IsReply field. -// Heuristic: If the flow has a TCP ACK flag, it is a reply. -// TODO: In future, the dataplane would need to maintain a contrack table -// to determine if a flow is a reply. -// Ref: https://github.com/cilium/cilium/blob/840cc579b7b5aac24ba00c4d8c8f1d10334882fa/bpf/lib/conntrack_map.h#L5 -func (p *Parser) decodeIsReply(f *flow.Flow) { - // Not applicable for DROPPED verdicts. - if f.GetVerdict() == flow.Verdict_DROPPED { - f.IsReply = nil - return - } - - if f.GetL4() != nil && f.GetL4().GetProtocol() != nil { - switch f.GetL4().GetProtocol().(type) { // nolint:gocritic - case *flow.Layer4_TCP: - tcpFlags := f.GetL4().GetTCP().GetFlags() - if tcpFlags != nil { - f.IsReply = &wrapperspb.BoolValue{Value: tcpFlags.GetACK()} - } - } - } -} - -// decodeTrafficDirection decodes the traffic direction of the flow. -// It is only required for DROPPED verdicts because dropreason bpf program -// cannot determine the traffic direction. We determine using the source endpoint's -// node IP. -// Note: If the source and destination are on the same node, then the traffic is outbound. -func (p *Parser) decodeTrafficDirection(f *flow.Flow) { - // Only required for DROPPED verdicts. - if f.GetVerdict() != flow.Verdict_DROPPED { - return - } - - // If the source EP's node is the same as the current node, then the traffic is outbound. - if p.ep.IsEndpointOnLocalHost(f.GetIP().GetSource()) { - f.TrafficDirection = flow.TrafficDirection_EGRESS - return - } - - // Default to ingress. - f.TrafficDirection = flow.TrafficDirection_INGRESS -} diff --git a/pkg/managers/pluginmanager/pluginmanager.go b/pkg/managers/pluginmanager/pluginmanager.go index 079bd7f14b..17f22d18ab 100644 --- a/pkg/managers/pluginmanager/pluginmanager.go +++ b/pkg/managers/pluginmanager/pluginmanager.go @@ -14,6 +14,7 @@ import ( "github.com/microsoft/retina/pkg/managers/watchermanager" "github.com/microsoft/retina/pkg/metrics" "github.com/microsoft/retina/pkg/plugin/api" + "github.com/microsoft/retina/pkg/plugin/conntrack" "github.com/microsoft/retina/pkg/plugin/registry" "github.com/microsoft/retina/pkg/telemetry" "github.com/pkg/errors" @@ -155,8 +156,18 @@ func (p *PluginManager) Start(ctx context.Context) error { } } - // start all plugins g, ctx := errgroup.WithContext(ctx) + + // run conntrack GC + ct, err := conntrack.New() + if err != nil { + return errors.Wrap(err, "failed to get conntrack instance") + } + g.Go(func() error { + return errors.Wrapf(ct.Run(ctx), "failed to run conntrack GC") + }) + + // start all plugins for _, plugin := range p.plugins { plug := plugin diff --git a/pkg/managers/pluginmanager/pluginmanager_test.go b/pkg/managers/pluginmanager/pluginmanager_test.go index b0beb55cb8..0c25079dfd 100644 --- a/pkg/managers/pluginmanager/pluginmanager_test.go +++ b/pkg/managers/pluginmanager/pluginmanager_test.go @@ -5,6 +5,7 @@ package pluginmanager import ( "context" "errors" + "strings" "testing" "time" @@ -132,7 +133,14 @@ func TestNewManagerStart(t *testing.T) { go func() { err = mgr.Start(ctx) - require.Nil(t, err, "Expected nil but got error:%w", err) + if err != nil { + // Ignore errors related to conntrack GC as it is not relevant to this test and it is expected to fail + if strings.Contains(err.Error(), "failed to get conntrack instance") || strings.Contains(err.Error(), "failed to run conntrack GC") { + t.Logf("Ignoring error: %v", err) + } else { + assert.NoError(t, err, "Expected nil but got error:%v", err) //nolint:testifylint // no reason not to use assert here + } + } }() time.Sleep(1 * time.Second) diff --git a/pkg/module/metrics/latency_test.go b/pkg/module/metrics/latency_test.go index dc9d88827b..fcdbfee87c 100644 --- a/pkg/module/metrics/latency_test.go +++ b/pkg/module/metrics/latency_test.go @@ -121,7 +121,7 @@ func TestProcessFlow(t *testing.T) { * Test case 1: TCP handshake. */ // Node -> Api server. - f1 := utils.ToFlow(t1, apiSeverIp, nodeIp, 80, 443, 6, 3, 0) + f1 := utils.ToFlow(l, t1, apiSeverIp, nodeIp, 80, 443, 6, 3, 0) metaf1 := &utils.RetinaMetadata{} utils.AddTCPID(metaf1, 1234) utils.AddTCPFlags(f1, 1, 0, 0, 0, 0, 0) @@ -131,7 +131,7 @@ func TestProcessFlow(t *testing.T) { } // Api server -> Node. - f2 := utils.ToFlow(t2, nodeIp, apiSeverIp, 443, 80, 6, 2, 0) + f2 := utils.ToFlow(l, t2, nodeIp, apiSeverIp, 443, 80, 6, 2, 0) metaf2 := &utils.RetinaMetadata{} utils.AddTCPID(metaf2, 1234) utils.AddTCPFlags(f2, 1, 1, 0, 0, 0, 0) diff --git a/pkg/plugin/common/constants.go b/pkg/plugin/common/constants.go index 0e6b435636..dafd1c8181 100644 --- a/pkg/plugin/common/constants.go +++ b/pkg/plugin/common/constants.go @@ -7,4 +7,6 @@ const ( MapPath = "/sys/fs/bpf" // FilterMapName is the name of the BPF filter map FilterMapName = "retina_filter_map" + // ConntrackMapName is the name of the BPF conntrack map + ConntrackMapName = "retina_conntrack_map" ) diff --git a/pkg/plugin/conntrack/_cprog/conntrack.c b/pkg/plugin/conntrack/_cprog/conntrack.c index faf71d645f..512ae7bc2c 100644 --- a/pkg/plugin/conntrack/_cprog/conntrack.c +++ b/pkg/plugin/conntrack/_cprog/conntrack.c @@ -6,6 +6,30 @@ #include "bpf_helpers.h" #include "conntrack.h" +struct tcpmetadata { + __u32 seq; // TCP sequence number + __u32 ack_num; // TCP ack number + __u32 tsval; // TCP timestamp value + __u32 tsecr; // TCP timestamp echo reply +}; + + +struct packet +{ + __u64 t_nsec; // timestamp in nanoseconds + __u32 bytes; // packet size in bytes + __u32 src_ip; + __u32 dst_ip; + __u16 src_port; + __u16 dst_port; + struct tcpmetadata tcp_metadata; // TCP metadata + __u8 observation_point; + __u8 traffic_direction; + __u8 proto; + __u8 flags; // For TCP packets, this is the TCP flags. For UDP packets, this is will always be 1 for conntrack purposes. + bool is_reply; +}; + /** * The structure representing an ipv4 5-tuple key in the connection tracking map. @@ -84,17 +108,16 @@ static __always_inline __u8 _ct_get_traffic_direction(__u8 observation_point) { * @arg key The key to be used to create the new connection. * @arg flags The flags of the packet. * @arg observation_point The point in the network stack where the packet is observed. - * @arg timeout The timeout for the connection. */ -static __always_inline bool _ct_create_new_tcp_connection(struct ct_v4_key key, __u8 flags, __u8 observation_point, __u64 timeout) { +static __always_inline bool _ct_create_new_tcp_connection(struct ct_v4_key key, __u8 flags, __u8 observation_point) { struct ct_entry new_value; __builtin_memset(&new_value, 0, sizeof(struct ct_entry)); __u64 now = bpf_mono_now(); // Check for overflow - if (timeout > UINT64_MAX - now) { + if (CT_SYN_TIMEOUT > UINT32_MAX - now) { return false; } - new_value.eviction_time = now + timeout; + new_value.eviction_time = now + CT_SYN_TIMEOUT; new_value.flags_seen_tx_dir = flags; new_value.traffic_direction = _ct_get_traffic_direction(observation_point); bpf_map_update_elem(&retina_conntrack_map, &key, &new_value, BPF_ANY); @@ -103,38 +126,44 @@ static __always_inline bool _ct_create_new_tcp_connection(struct ct_v4_key key, /** * Create a new UDP connection. + * @arg *p pointer to the packet to be processed. * @arg key The key to be used to create the new connection. - * @arg flags The flags of the packet. * @arg observation_point The point in the network stack where the packet is observed. */ -static __always_inline bool _ct_handle_udp_connection(struct ct_v4_key key, __u8 flags, __u8 observation_point) { +static __always_inline bool _ct_handle_udp_connection(struct packet *p, struct ct_v4_key key, __u8 observation_point) { struct ct_entry new_value; __builtin_memset(&new_value, 0, sizeof(struct ct_entry)); __u64 now = bpf_mono_now(); // Check for overflow - if (CT_CONNECTION_LIFETIME_NONTCP > UINT64_MAX - now) { + if (CT_CONNECTION_LIFETIME_NONTCP > UINT32_MAX - now) { return false; } new_value.eviction_time = now + CT_CONNECTION_LIFETIME_NONTCP; - new_value.flags_seen_tx_dir = flags; + new_value.flags_seen_tx_dir = p->flags; new_value.last_report_tx_dir = now; new_value.traffic_direction = _ct_get_traffic_direction(observation_point); bpf_map_update_elem(&retina_conntrack_map, &key, &new_value, BPF_ANY); + // Update packet + p->is_reply = false; + p->traffic_direction = new_value.traffic_direction; return true; } /** * Handle a TCP connection. + * @arg *p pointer to the packet to be processed. * @arg key The key to be used to handle the connection. * @arg reverse_key The reverse key to be used to handle the connection. - * @arg flags The flags of the packet. * @arg observation_point The point in the network stack where the packet is observed. */ -static __always_inline bool _ct_handle_tcp_connection(struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 flags, __u8 observation_point) { +static __always_inline bool _ct_handle_tcp_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point) { // Check if the packet is a SYN packet. - if (flags & TCP_SYN) { + if (p->flags & TCP_SYN) { + // Update packet accordingly. + p->is_reply = false; + p->traffic_direction = _ct_get_traffic_direction(observation_point); // Create a new connection with a timeout of CT_SYN_TIMEOUT. - return _ct_create_new_tcp_connection(key, flags, observation_point, CT_SYN_TIMEOUT); + return _ct_create_new_tcp_connection(key, p->flags, observation_point); } // The packet is not a SYN packet and the connection corresponding to this packet is not found. @@ -144,20 +173,23 @@ static __always_inline bool _ct_handle_tcp_connection(struct ct_v4_key key, stru __builtin_memset(&new_value, 0, sizeof(struct ct_entry)); __u64 now = bpf_mono_now(); // Check for overflow - if (CT_CONNECTION_LIFETIME_TCP > UINT64_MAX - now) { + if (CT_CONNECTION_LIFETIME_TCP > UINT32_MAX - now) { return false; } new_value.eviction_time = now + CT_CONNECTION_LIFETIME_TCP; - new_value.is_closing = (flags & (TCP_FIN | TCP_RST)) != 0x0; + new_value.is_closing = (p->flags & (TCP_FIN | TCP_RST)) != 0x0; new_value.traffic_direction = _ct_get_traffic_direction(observation_point); + p->traffic_direction = new_value.traffic_direction; // Check for ACK flag. If the ACK flag is set, the packet is considered as a packet in the reply direction of the connection. - if (flags & TCP_ACK) { - new_value.flags_seen_rx_dir = flags; + if (p->flags & TCP_ACK) { + p->is_reply = true; + new_value.flags_seen_rx_dir = p->flags; new_value.last_report_rx_dir = now; bpf_map_update_elem(&retina_conntrack_map, &reverse_key, &new_value, BPF_ANY); } else { // Otherwise, the packet is considered as a packet in the send direction. - new_value.flags_seen_tx_dir = flags; + p->is_reply = false; + new_value.flags_seen_tx_dir = p->flags; new_value.last_report_tx_dir = now; bpf_map_update_elem(&retina_conntrack_map, &key, &new_value, BPF_ANY); } @@ -166,16 +198,16 @@ static __always_inline bool _ct_handle_tcp_connection(struct ct_v4_key key, stru /** * Handle a new connection. + * @arg *p pointer to the packet to be processed. * @arg key The key to be used to handle the connection. * @arg reverse_key The reverse key to be used to handle the connection. - * @arg flags The flags of the packet. * @arg observation_point The point in the network stack where the packet is observed. */ -static __always_inline bool _ct_handle_new_connection(struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 flags, __u8 observation_point) { +static __always_inline bool _ct_handle_new_connection(struct packet *p, struct ct_v4_key key, struct ct_v4_key reverse_key, __u8 observation_point) { if (key.proto & IPPROTO_TCP) { - return _ct_handle_tcp_connection(key, reverse_key, flags, observation_point); + return _ct_handle_tcp_connection(p, key, reverse_key, observation_point); } else if (key.proto & IPPROTO_UDP) { - return _ct_handle_udp_connection(key, flags, observation_point); + return _ct_handle_udp_connection(p, key, observation_point); } else { return false; // We are not interested in other protocols. } @@ -225,12 +257,12 @@ static __always_inline bool _ct_should_report_packet(struct ct_entry *entry, __u // Update the eviction time of the connection. if (protocol == IPPROTO_TCP) { // Check for overflow, only update the eviction time if there is no overflow. - if (CT_CONNECTION_LIFETIME_TCP > UINT64_MAX - now) { + if (CT_CONNECTION_LIFETIME_TCP > UINT32_MAX - now) { return false; } WRITE_ONCE(entry->eviction_time, now + CT_CONNECTION_LIFETIME_TCP); } else { - if (CT_CONNECTION_LIFETIME_NONTCP > UINT64_MAX - now) { + if (CT_CONNECTION_LIFETIME_NONTCP > UINT32_MAX - now) { return false; } WRITE_ONCE(entry->eviction_time, now + CT_CONNECTION_LIFETIME_NONTCP); @@ -251,18 +283,31 @@ static __always_inline bool _ct_should_report_packet(struct ct_entry *entry, __u /** * Process a packet and update the connection tracking map. - * @arg key The key to be used to lookup the connection in the map. - * @arg flags The flags of the packet. + * @arg *p pointer to the packet to be processed. * @arg observation_point The point in the network stack where the packet is observed. * Returns true if the packet should be report to userspace. False otherwise. */ -static __always_inline __attribute__((unused)) bool ct_process_packet(struct ct_v4_key key, __u8 flags, __u8 observation_point) { +static __always_inline __attribute__((unused)) bool ct_process_packet(struct packet *p, __u8 observation_point) { + if (!p) { + return false; + } + // Create a new key for the send direction. + struct ct_v4_key key; + __builtin_memset(&key, 0, sizeof(struct ct_v4_key)); + key.src_ip = p->src_ip; + key.dst_ip = p->dst_ip; + key.src_port = p->src_port; + key.dst_port = p->dst_port; + key.proto = p->proto; // Lookup the connection in the map. struct ct_entry *entry = bpf_map_lookup_elem(&retina_conntrack_map, &key); // If the connection is found in the send direction, update the connection. if (entry) { - return _ct_should_report_packet(entry, flags, CT_PACKET_DIR_TX, key.proto); + // Update the packet accordingly. + p->is_reply = false; + p->traffic_direction = entry->traffic_direction; + return _ct_should_report_packet(entry, p->flags, CT_PACKET_DIR_TX, key.proto); } // The connection is not found in the send direction. Check the reply direction by reversing the key. @@ -275,43 +320,12 @@ static __always_inline __attribute__((unused)) bool ct_process_packet(struct ct_ // If the connection is found based on the reverse key, meaning that the packet is a reply packet to an existing connection. if (entry) { - return _ct_should_report_packet(entry, flags, CT_PACKET_DIR_RX, key.proto); + // Update the packet accordingly. + p->is_reply = true; + p->traffic_direction = entry->traffic_direction; + return _ct_should_report_packet(entry, p->flags, CT_PACKET_DIR_RX, key.proto); } // If the connection is still not found, the connection is new. - return _ct_handle_new_connection(key, reverse_key, flags, observation_point); -} - -/** - * Check if a packet is a reply packet to a connection. - * @arg key The key to be used to check if the packet is a reply packet. - */ -static __always_inline __attribute__((unused)) bool ct_is_reply_packet(struct ct_v4_key key) { - // Lookup the connection in the map. - struct ct_entry *entry = bpf_map_lookup_elem(&retina_conntrack_map, &key); - // We return false here because we found the connection in the send direction - // meaning that the packet is coming from the initiator of the connection and therefore not a reply packet. - return entry == NULL; -} - -/** - * Get the traffic direction of a connection. - * @arg key The key to be used to get the traffic direction of the connection. - */ -static __always_inline __attribute__((unused)) __u8 ct_get_traffic_direction(struct ct_v4_key key) { - // Lookup the connection in the map. - struct ct_entry *entry = bpf_map_lookup_elem(&retina_conntrack_map, &key); - if (entry) { - return entry->traffic_direction; - } - // Construct the reverse key. - struct ct_v4_key reverse_key; - __builtin_memset(&reverse_key, 0, sizeof(struct ct_v4_key)); - _ct_reverse_key(&reverse_key, &key); - // Lookup the connection in the map based on the reverse key. - entry = bpf_map_lookup_elem(&retina_conntrack_map, &reverse_key); - if (entry) { - return entry->traffic_direction; - } - return TRAFFIC_DIRECTION_UNKNOWN; + return _ct_handle_new_connection(p, key, reverse_key, observation_point); } diff --git a/pkg/plugin/conntrack/_cprog/conntrack.h b/pkg/plugin/conntrack/_cprog/conntrack.h index 8ab43db24c..4c87acd473 100644 --- a/pkg/plugin/conntrack/_cprog/conntrack.h +++ b/pkg/plugin/conntrack/_cprog/conntrack.h @@ -13,7 +13,7 @@ ({ __u64 __x = bpf_ktime_get_boot_ns() / NSEC_PER_SEC; __x; }) # define bpf_mono_now() bpf_ktime_get_sec() -#define UINT64_MAX 18446744073709551615ULL +#define UINT32_MAX 4294967295U // Time units in seconds diff --git a/pkg/plugin/conntrack/conntrack_linux.go b/pkg/plugin/conntrack/conntrack_linux.go index 06cb05d0c2..69a7ca3b36 100644 --- a/pkg/plugin/conntrack/conntrack_linux.go +++ b/pkg/plugin/conntrack/conntrack_linux.go @@ -9,8 +9,8 @@ import ( "time" "github.com/cilium/ebpf" + "github.com/cilium/ebpf/rlimit" "github.com/microsoft/retina/internal/ktime" - "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/log" plugincommon "github.com/microsoft/retina/pkg/plugin/common" _ "github.com/microsoft/retina/pkg/plugin/conntrack/_cprog" // nolint // This is needed so cprog is included when vendoring @@ -19,22 +19,34 @@ import ( "go.uber.org/zap" ) -// New creates a packetparser plugin. -// //go:generate go run github.com/cilium/ebpf/cmd/bpf2go@master -cflags "-g -O2 -Wall -D__TARGET_ARCH_${GOARCH} -Wall" -target ${GOARCH} -type ct_v4_key conntrack ./_cprog/conntrack.c -- -I../lib/_${GOARCH} -I../lib/common/libbpf/_src -I../lib/common/libbpf/_include/linux -I../lib/common/libbpf/_include/uapi/linux -I../lib/common/libbpf/_include/asm -func New(cfg *config.Config) *Conntrack { - return &Conntrack{ - l: log.Logger().Named("conntrack"), - gcFrequency: defaultGCFrequency, - cfg: cfg, + +// Init initializes the conntrack eBPF map in the kernel for the first time. +// This function should be called in the init container since +// it requires securityContext.privileged to be true. +func Init() error { + // Allow the current process to lock memory for eBPF resources. + if err := rlimit.RemoveMemlock(); err != nil { + return errors.Wrapf(err, "failed to remove memlock limit") + } + + objs := &conntrackObjects{} + err := loadConntrackObjects(objs, &ebpf.CollectionOptions{ + Maps: ebpf.MapOptions{ + PinPath: plugincommon.MapPath, + }, + }) + if err != nil { + return errors.Wrap(err, "failed to load conntrack objects") } + return nil } -// Run starts the Conntrack garbage collection loop. -func (ct *Conntrack) Run(ctx context.Context) error { - if ct.cfg.DataAggregationLevel == config.Low { - ct.l.Info("conntrack is disabled in low data aggregation level") - return nil +// New returns a new Conntrack instance. +func New() (*Conntrack, error) { + ct := &Conntrack{ + l: log.Logger().Named("conntrack"), + gcFrequency: defaultGCFrequency, } objs := &conntrackObjects{} @@ -45,23 +57,26 @@ func (ct *Conntrack) Run(ctx context.Context) error { }) if err != nil { ct.l.Error("loadConntrackObjects failed", zap.Error(err)) - return errors.Wrap(err, "failed to load conntrack objects") + return nil, errors.Wrap(err, "failed to load conntrack objects") } ct.objs = objs - // Get the conntrack map from the objects ct.ctMap = objs.RetinaConntrackMap + return ct, nil +} +// Run starts the Conntrack garbage collection loop. +func (ct *Conntrack) Run(ctx context.Context) error { ticker := time.NewTicker(ct.gcFrequency) defer ticker.Stop() - ct.l.Info("starting Conntrack GC loop") + ct.l.Info("Starting Conntrack GC loop") for { select { case <-ctx.Done(): - ct.l.Info("stopping conntrack GC loop") + ct.l.Info("Stopping conntrack GC loop") if ct.objs != nil { err := ct.objs.Close() if err != nil { @@ -80,6 +95,7 @@ func (ct *Conntrack) Run(ctx context.Context) error { iter := ct.ctMap.Iterate() for iter.Next(&key, &value) { noOfCtEntries++ + // Check if the connection is closing or has expired if value.IsClosing || ktime.MonotonicOffset.Seconds()+float64(value.EvictionTime) < float64((time.Now().Unix())) { // Iterating a hash map from which keys are being deleted is not safe. // So, we store the keys to be deleted in a list and delete them after the iteration. diff --git a/pkg/plugin/conntrack/conntrack_windows.go b/pkg/plugin/conntrack/conntrack_windows.go index d9647e7511..df674e2fd5 100644 --- a/pkg/plugin/conntrack/conntrack_windows.go +++ b/pkg/plugin/conntrack/conntrack_windows.go @@ -2,15 +2,13 @@ package conntrack import ( "context" - - "github.com/microsoft/retina/pkg/config" ) type Conntrack struct{} // Not implemented for Windows -func New(_ *config.Config) *Conntrack { - return &Conntrack{} +func New() (*Conntrack, error) { + return &Conntrack{}, nil } // Not implemented for Windows diff --git a/pkg/plugin/conntrack/types_linux.go b/pkg/plugin/conntrack/types_linux.go index 0a7845b9d1..637fcade4a 100644 --- a/pkg/plugin/conntrack/types_linux.go +++ b/pkg/plugin/conntrack/types_linux.go @@ -5,7 +5,6 @@ import ( "time" "github.com/cilium/ebpf" - "github.com/microsoft/retina/pkg/config" "github.com/microsoft/retina/pkg/log" ) @@ -18,7 +17,6 @@ type Conntrack struct { objs *conntrackObjects ctMap *ebpf.Map gcFrequency time.Duration - cfg *config.Config } // Define TCP flag constants diff --git a/pkg/plugin/dns/dns_linux.go b/pkg/plugin/dns/dns_linux.go index e481b967c1..a27b000415 100644 --- a/pkg/plugin/dns/dns_linux.go +++ b/pkg/plugin/dns/dns_linux.go @@ -113,7 +113,7 @@ func (d *dns) eventHandler(event *types.Event) { return } - var dir uint32 + var dir uint8 if event.PktType == "HOST" { // Ingress. dir = 2 @@ -126,6 +126,7 @@ func (d *dns) eventHandler(event *types.Event) { // Update advanced metrics. fl := utils.ToFlow( + d.l, int64(event.Timestamp), net.ParseIP(event.SrcIP), net.ParseIP(event.DstIP), diff --git a/pkg/plugin/dropreason/dropreason_linux.go b/pkg/plugin/dropreason/dropreason_linux.go index f8a4440cb6..843b6ca8a9 100644 --- a/pkg/plugin/dropreason/dropreason_linux.go +++ b/pkg/plugin/dropreason/dropreason_linux.go @@ -352,6 +352,7 @@ func (dr *dropReason) processRecord(ctx context.Context, id int) { destinationPortShort := uint32(utils.HostToNetShort(bpfEvent.DstPort)) fl := utils.ToFlow( + dr.l, ktime.MonotonicOffset.Nanoseconds()+int64(bpfEvent.Ts), utils.Int2ip(bpfEvent.SrcIp).To4(), // Precautionary To4() call. utils.Int2ip(bpfEvent.DstIp).To4(), // Precautionary To4() call. @@ -366,6 +367,9 @@ func (dr *dropReason) processRecord(ctx context.Context, id int) { continue } + // IsReply is not applicable for DROPPED verdicts. + fl.IsReply = nil + meta := &utils.RetinaMetadata{} // Add drop reason to the flow's metadata. diff --git a/pkg/plugin/dropreason/kprobe_bpfel_arm64.go b/pkg/plugin/dropreason/kprobe_bpfel_arm64.go index d072c80c17..a3f95f48a1 100644 --- a/pkg/plugin/dropreason/kprobe_bpfel_arm64.go +++ b/pkg/plugin/dropreason/kprobe_bpfel_arm64.go @@ -20,9 +20,9 @@ type kprobeMapKey struct { } type kprobeMetricsMapKey struct { - ReturnVal uint32 DropType uint16 _ [2]byte + ReturnVal uint32 } type kprobeMetricsMapValue struct { diff --git a/pkg/plugin/dropreason/kprobe_bpfel_x86.go b/pkg/plugin/dropreason/kprobe_bpfel_x86.go index 7bf8f59a01..224e5e83a1 100644 --- a/pkg/plugin/dropreason/kprobe_bpfel_x86.go +++ b/pkg/plugin/dropreason/kprobe_bpfel_x86.go @@ -20,9 +20,9 @@ type kprobeMapKey struct { } type kprobeMetricsMapKey struct { - ReturnVal uint32 DropType uint16 _ [2]byte + ReturnVal uint32 } type kprobeMetricsMapValue struct { diff --git a/pkg/plugin/packetparser/_cprog/dynamic.h b/pkg/plugin/packetparser/_cprog/dynamic.h index 80abbd931f..ecadc42211 100644 --- a/pkg/plugin/packetparser/_cprog/dynamic.h +++ b/pkg/plugin/packetparser/_cprog/dynamic.h @@ -1,2 +1,2 @@ -// Place holder header file that will be replaced by the actual header file during runtime -// DO NOT DELETE +#define BYPASS_LOOKUP_IP_OF_INTEREST 0 +#define DATA_AGGREGATION_LEVEL 0 diff --git a/pkg/plugin/packetparser/_cprog/packetparser.c b/pkg/plugin/packetparser/_cprog/packetparser.c index 935aa2095a..b130fbde88 100644 --- a/pkg/plugin/packetparser/_cprog/packetparser.c +++ b/pkg/plugin/packetparser/_cprog/packetparser.c @@ -5,41 +5,13 @@ #include "bpf_helpers.h" #include "bpf_endian.h" #include "packetparser.h" +#include "conntrack.c" +#include "conntrack.h" #include "retina_filter.c" #include "dynamic.h" char __license[] SEC("license") = "Dual MIT/GPL"; - - -struct tcpmetadata { - __u32 seq; // TCP sequence number - __u32 ack_num; // TCP ack number - // TCP flags. - __u16 syn; - __u16 ack; - __u16 fin; - __u16 rst; - __u16 psh; - __u16 urg; - __u32 tsval; // TCP timestamp value - __u32 tsecr; // TCP timestamp echo reply -}; - -struct packet -{ - // 5 tuple. - __u32 src_ip; - __u32 dst_ip; - __u16 src_port; - __u16 dst_port; - __u8 proto; - struct tcpmetadata tcp_metadata; // TCP metadata - direction dir; // 0 -> INGRESS, 1 -> EGRESS - __u64 ts; // timestamp in nanoseconds - __u32 bytes; // packet size in bytes -}; - struct { __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); @@ -92,64 +64,64 @@ static int parse_tcp_ts(struct tcphdr *tcph, void *data_end, __u32 *tsval, __u32 // Loop through the options field to find the TSval and TSecr values. // MAX_TCP_OPTIONS_LEN is used to prevent infinite loops and the fact that the options field is at most 40 bytes long. -#pragma unroll + #pragma unroll for (i = 0; i < MAX_TCP_OPTIONS_LEN; i++) { // Verify that adding 1 to the current pointer will not go past the end of the packet. if (tcp_options_cur_ptr + 1 > (__u8 *)tcp_opt_end_ptr || tcp_options_cur_ptr + 1 > (__u8 *)data_end) { return -1; } - // Dereference the pointer to get the option kind. - opt_kind = *tcp_options_cur_ptr; - // switch case to check the option kind. - switch (opt_kind) { - case 0: - // End of options list. - return -1; - case 1: - // No operation. - tcp_options_cur_ptr++; - continue; - default: - // Some kind of option. - // Since each option is at least 2 bytes long, we need to check that adding 2 to the pointer will not go past the end of the packet. - if (tcp_options_cur_ptr + 2 > tcp_opt_end_ptr || tcp_options_cur_ptr + 2 > (__u8 *)data_end) { - return -1; - } - // Get the length of the option. - opt_len = *(tcp_options_cur_ptr + 1); - // Check that the option length is valid. It should be at least 2 bytes long. - if (opt_len < 2) { + // Dereference the pointer to get the option kind. + opt_kind = *tcp_options_cur_ptr; + // switch case to check the option kind. + switch (opt_kind) { + case 0: + // End of options list. return -1; - } - // Check if the option is the timestamp option. The timestamp option has a kind of 8 and a length of 10 bytes. - if (opt_kind == 8 && opt_len == 10) { - // Verify that adding the option's length to the pointer will not go past the end of the packet. - if (tcp_options_cur_ptr + 10 > tcp_opt_end_ptr || tcp_options_cur_ptr + 10 > (__u8 *)data_end) { + case 1: + // No operation. + tcp_options_cur_ptr++; + continue; + default: + // Some kind of option. + // Since each option is at least 2 bytes long, we need to check that adding 2 to the pointer will not go past the end of the packet. + if (tcp_options_cur_ptr + 2 > tcp_opt_end_ptr || tcp_options_cur_ptr + 2 > (__u8 *)data_end) { return -1; } - // Found the TSval and TSecr values. Store them in the tsval and tsecr pointers. - *tsval = bpf_ntohl(*(__u32 *)(tcp_options_cur_ptr + 2)); - *tsecr = bpf_ntohl(*(__u32 *)(tcp_options_cur_ptr + 6)); - - return 0; - } - // Move the pointer to the next option. - tcp_options_cur_ptr += opt_len; - } - } + // Get the length of the option. + opt_len = *(tcp_options_cur_ptr + 1); + // Check that the option length is valid. It should be at least 2 bytes long. + if (opt_len < 2) { + return -1; + } + // Check if the option is the timestamp option. The timestamp option has a kind of 8 and a length of 10 bytes. + if (opt_kind == 8 && opt_len == 10) { + // Verify that adding the option's length to the pointer will not go past the end of the packet. + if (tcp_options_cur_ptr + 10 > tcp_opt_end_ptr || tcp_options_cur_ptr + 10 > (__u8 *)data_end) { + return -1; + } + // Found the TSval and TSecr values. Store them in the tsval and tsecr pointers. + *tsval = bpf_ntohl(*(__u32 *)(tcp_options_cur_ptr + 2)); + *tsecr = bpf_ntohl(*(__u32 *)(tcp_options_cur_ptr + 6)); + + return 0; + } + // Move the pointer to the next option. + tcp_options_cur_ptr += opt_len; + } + } return -1; } // Function to parse the packet and send it to the perf buffer. -static void parse(struct __sk_buff *skb, direction d) +static void parse(struct __sk_buff *skb, __u8 obs) { struct packet p; __builtin_memset(&p, 0, sizeof(p)); // Get current time in nanoseconds. - p.ts = bpf_ktime_get_boot_ns(); + p.t_nsec = bpf_ktime_get_boot_ns(); - p.dir = d; + p.observation_point = obs; p.bytes = skb->len; void *data_end = (void *)(unsigned long long)skb->data_end; @@ -182,6 +154,7 @@ static void parse(struct __sk_buff *skb, direction d) } #endif #endif + // Get source and destination ports. if (ip->protocol == IPPROTO_TCP) { @@ -192,19 +165,16 @@ static void parse(struct __sk_buff *skb, direction d) p.src_port = tcp->source; p.dst_port = tcp->dest; + // Get TCP metadata. struct tcpmetadata tcp_metadata; __builtin_memset(&tcp_metadata, 0, sizeof(tcp_metadata)); + // Get all TCP flags. + p.flags = (tcp->fin << 0) | (tcp->syn << 1) | (tcp->rst << 2) | (tcp->psh << 3) | (tcp->ack << 4) | (tcp->urg << 5) | (tcp->ece << 6) | (tcp->cwr << 7); + tcp_metadata.seq = tcp->seq; tcp_metadata.ack_num = tcp->ack_seq; - tcp_metadata.syn = tcp->syn; - tcp_metadata.ack = tcp->ack; - tcp_metadata.fin = tcp->fin; - tcp_metadata.rst = tcp->rst; - tcp_metadata.psh = tcp->psh; - tcp_metadata.urg = tcp->urg; - p.tcp_metadata = tcp_metadata; // Get TSval/TSecr from TCP header. @@ -221,13 +191,31 @@ static void parse(struct __sk_buff *skb, direction d) p.src_port = udp->source; p.dst_port = udp->dest; + + p.flags = 1; } else { return; } - bpf_perf_event_output(skb, &packetparser_events, BPF_F_CURRENT_CPU, &p, sizeof(p)); + + // Process the packet in ct + bool report __attribute__((unused)); + report = ct_process_packet(&p, obs); + #ifdef DATA_AGGREGATION_LEVEL + // If the data aggregation level is low, always send the packet to the perf buffer. + #if DATA_AGGREGATION_LEVEL == DATA_AGGREGATION_LEVEL_LOW + bpf_perf_event_output(skb, &packetparser_events, BPF_F_CURRENT_CPU, &p, sizeof(p)); + return; + // If the data aggregation level is high, only send the packet to the perf buffer if it needs to be reported. + #elif DATA_AGGREGATION_LEVEL == DATA_AGGREGATION_LEVEL_HIGH + if (report) { + bpf_perf_event_output(skb, &packetparser_events, BPF_F_CURRENT_CPU, &p, sizeof(p)); + } + #endif + #endif + return; } SEC("classifier_endpoint_ingress") @@ -235,7 +223,7 @@ int endpoint_ingress_filter(struct __sk_buff *skb) { // This is attached to the interface on the host side. // So ingress on host is egress on endpoint and vice versa. - parse(skb, FROM_ENDPOINT); + parse(skb, OBSERVATION_POINT_FROM_ENDPOINT); // Always return TC_ACT_UNSPEC to allow packet to pass to the next BPF program. return TC_ACT_UNSPEC; } @@ -245,7 +233,7 @@ int endpoint_egress_filter(struct __sk_buff *skb) { // This is attached to the interface on the host side. // So egress on host is ingress on endpoint and vice versa. - parse(skb, TO_ENDPOINT); + parse(skb, OBSERVATION_POINT_TO_ENDPOINT); // Always return TC_ACT_UNSPEC to allow packet to pass to the next BPF program. return TC_ACT_UNSPEC; } @@ -253,7 +241,7 @@ int endpoint_egress_filter(struct __sk_buff *skb) SEC("classifier_host_ingress") int host_ingress_filter(struct __sk_buff *skb) { - parse(skb, FROM_NETWORK); + parse(skb, OBSERVATION_POINT_FROM_NETWORK); // Always return TC_ACT_UNSPEC to allow packet to pass to the next BPF program. return TC_ACT_UNSPEC; } @@ -261,7 +249,7 @@ int host_ingress_filter(struct __sk_buff *skb) SEC("classifier_host_egress") int host_egress_filter(struct __sk_buff *skb) { - parse(skb, TO_NETWORK); + parse(skb, OBSERVATION_POINT_TO_NETWORK); // Always return TC_ACT_UNSPEC to allow packet to pass to the next BPF program. return TC_ACT_UNSPEC; } diff --git a/pkg/plugin/packetparser/_cprog/packetparser.h b/pkg/plugin/packetparser/_cprog/packetparser.h index d3e8115eab..d60cfdef14 100644 --- a/pkg/plugin/packetparser/_cprog/packetparser.h +++ b/pkg/plugin/packetparser/_cprog/packetparser.h @@ -7,10 +7,5 @@ // tc-bpf return code to execute the next tc-bpf program. #define TC_ACT_UNSPEC (-1) -typedef enum -{ - FROM_ENDPOINT = 0, - TO_ENDPOINT, - FROM_NETWORK, - TO_NETWORK, -} direction; +#define DATA_AGGREGATION_LEVEL_LOW 0 +#define DATA_AGGREGATION_LEVEL_HIGH 1 diff --git a/pkg/plugin/packetparser/packetparser_bpfel_arm64.go b/pkg/plugin/packetparser/packetparser_bpfel_arm64.go index f48fda43c2..30f08a5c97 100644 --- a/pkg/plugin/packetparser/packetparser_bpfel_arm64.go +++ b/pkg/plugin/packetparser/packetparser_bpfel_arm64.go @@ -12,33 +12,49 @@ import ( "github.com/cilium/ebpf" ) +type packetparserCtEntry struct { + EvictionTime uint32 + LastReportTxDir uint32 + LastReportRxDir uint32 + TrafficDirection uint8 + FlagsSeenTxDir uint8 + FlagsSeenRxDir uint8 + IsClosing bool +} + +type packetparserCtV4Key struct { + SrcIp uint32 + DstIp uint32 + SrcPort uint16 + DstPort uint16 + Proto uint8 + _ [3]byte +} + type packetparserMapKey struct { Prefixlen uint32 Data uint32 } type packetparserPacket struct { + T_nsec uint64 + Bytes uint32 SrcIp uint32 DstIp uint32 SrcPort uint16 DstPort uint16 - Proto uint8 - _ [3]byte TcpMetadata struct { Seq uint32 AckNum uint32 - Syn uint16 - Ack uint16 - Fin uint16 - Rst uint16 - Psh uint16 - Urg uint16 Tsval uint32 Tsecr uint32 } - Dir uint32 - Ts uint64 - Bytes uint64 + ObservationPoint uint8 + TrafficDirection uint8 + Proto uint8 + Flags uint8 + IsReply bool + _ [3]byte } // loadPacketparser returns the embedded CollectionSpec for packetparser. @@ -93,6 +109,7 @@ type packetparserProgramSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type packetparserMapSpecs struct { PacketparserEvents *ebpf.MapSpec `ebpf:"packetparser_events"` + RetinaConntrackMap *ebpf.MapSpec `ebpf:"retina_conntrack_map"` RetinaFilterMap *ebpf.MapSpec `ebpf:"retina_filter_map"` } @@ -116,12 +133,14 @@ func (o *packetparserObjects) Close() error { // It can be passed to loadPacketparserObjects or ebpf.CollectionSpec.LoadAndAssign. type packetparserMaps struct { PacketparserEvents *ebpf.Map `ebpf:"packetparser_events"` + RetinaConntrackMap *ebpf.Map `ebpf:"retina_conntrack_map"` RetinaFilterMap *ebpf.Map `ebpf:"retina_filter_map"` } func (m *packetparserMaps) Close() error { return _PacketparserClose( m.PacketparserEvents, + m.RetinaConntrackMap, m.RetinaFilterMap, ) } diff --git a/pkg/plugin/packetparser/packetparser_bpfel_x86.go b/pkg/plugin/packetparser/packetparser_bpfel_x86.go index 1338d43764..10f71606ab 100644 --- a/pkg/plugin/packetparser/packetparser_bpfel_x86.go +++ b/pkg/plugin/packetparser/packetparser_bpfel_x86.go @@ -12,34 +12,49 @@ import ( "github.com/cilium/ebpf" ) +type packetparserCtEntry struct { + EvictionTime uint32 + LastReportTxDir uint32 + LastReportRxDir uint32 + TrafficDirection uint8 + FlagsSeenTxDir uint8 + FlagsSeenRxDir uint8 + IsClosing bool +} + +type packetparserCtV4Key struct { + SrcIp uint32 + DstIp uint32 + SrcPort uint16 + DstPort uint16 + Proto uint8 + _ [3]byte +} + type packetparserMapKey struct { Prefixlen uint32 Data uint32 } type packetparserPacket struct { + T_nsec uint64 + Bytes uint32 SrcIp uint32 DstIp uint32 SrcPort uint16 DstPort uint16 - Proto uint8 - _ [3]byte TcpMetadata struct { Seq uint32 AckNum uint32 - Syn uint16 - Ack uint16 - Fin uint16 - Rst uint16 - Psh uint16 - Urg uint16 Tsval uint32 Tsecr uint32 } - Dir uint32 - Ts uint64 - Bytes uint32 - _ [4]byte + ObservationPoint uint8 + TrafficDirection uint8 + Proto uint8 + Flags uint8 + IsReply bool + _ [3]byte } // loadPacketparser returns the embedded CollectionSpec for packetparser. @@ -94,6 +109,7 @@ type packetparserProgramSpecs struct { // It can be passed ebpf.CollectionSpec.Assign. type packetparserMapSpecs struct { PacketparserEvents *ebpf.MapSpec `ebpf:"packetparser_events"` + RetinaConntrackMap *ebpf.MapSpec `ebpf:"retina_conntrack_map"` RetinaFilterMap *ebpf.MapSpec `ebpf:"retina_filter_map"` } @@ -117,12 +133,14 @@ func (o *packetparserObjects) Close() error { // It can be passed to loadPacketparserObjects or ebpf.CollectionSpec.LoadAndAssign. type packetparserMaps struct { PacketparserEvents *ebpf.Map `ebpf:"packetparser_events"` + RetinaConntrackMap *ebpf.Map `ebpf:"retina_conntrack_map"` RetinaFilterMap *ebpf.Map `ebpf:"retina_filter_map"` } func (m *packetparserMaps) Close() error { return _PacketparserClose( m.PacketparserEvents, + m.RetinaConntrackMap, m.RetinaFilterMap, ) } diff --git a/pkg/plugin/packetparser/packetparser_linux.go b/pkg/plugin/packetparser/packetparser_linux.go index a1821cd59a..4737ff62dd 100644 --- a/pkg/plugin/packetparser/packetparser_linux.go +++ b/pkg/plugin/packetparser/packetparser_linux.go @@ -15,6 +15,7 @@ import ( "sync" "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/wrapperspb" "github.com/cilium/cilium/api/v1/flow" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" @@ -46,8 +47,7 @@ import ( "golang.org/x/sys/unix" ) -//go:generate go run github.com/cilium/ebpf/cmd/bpf2go@master -cflags "-g -O2 -Wall -D__TARGET_ARCH_${GOARCH} -Wall" -target ${GOARCH} -type packet packetparser ./_cprog/packetparser.c -- -I../lib/_${GOARCH} -I../lib/common/libbpf/_src -I../filter/_cprog/ - +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go@master -cflags "-g -O2 -Wall -D__TARGET_ARCH_${GOARCH} -Wall" -target ${GOARCH} -type packet packetparser ./_cprog/packetparser.c -- -I../lib/_${GOARCH} -I../lib/common/libbpf/_src -I../lib/common/libbpf/_include/linux -I../lib/common/libbpf/_include/uapi/linux -I../lib/common/libbpf/_include/asm -I../filter/_cprog/ -I../conntrack/_cprog/ var errNoOutgoingLinks = errors.New("could not determine any outgoing links") // New creates a packetparser plugin. @@ -70,16 +70,17 @@ func (p *packetParser) Generate(ctx context.Context) error { } dir := path.Dir(filename) dynamicHeaderPath := fmt.Sprintf("%s/%s/%s", dir, bpfSourceDir, dynamicHeaderFileName) - i := 0 + // Check if packetparser will bypassing lookup IP of interest. + bypassLookupIPOfInterest := 0 if p.cfg.BypassLookupIPOfInterest { - p.l.Logger.Info("Bypassing lookup IP of interest") - i = 1 + p.l.Info("bypassing lookup IP of interest") + bypassLookupIPOfInterest = 1 } - st := fmt.Sprintf("#define BYPASS_LOOKUP_IP_OF_INTEREST %d \n", i) + p.l.Info("data aggregation level", zap.String("level", p.cfg.DataAggregationLevel.String())) + st := fmt.Sprintf("#define BYPASS_LOOKUP_IP_OF_INTEREST %d\n#define DATA_AGGREGATION_LEVEL %d\n", bypassLookupIPOfInterest, p.cfg.DataAggregationLevel) err := loader.WriteFile(ctx, dynamicHeaderPath, st) if err != nil { - p.l.Error("Error writing dynamic header", zap.Error(err)) - return err + return errors.Wrap(err, "failed to write dynamic header") } p.l.Info("PacketParser header generated at", zap.String("path", dynamicHeaderPath)) return nil @@ -95,15 +96,27 @@ func (p *packetParser) Compile(ctx context.Context) error { bpfSourceFile := fmt.Sprintf("%s/%s/%s", dir, bpfSourceDir, bpfSourceFileName) bpfOutputFile := fmt.Sprintf("%s/%s", dir, bpfObjectFileName) arch := runtime.GOARCH - includeDir := fmt.Sprintf("-I%s/../lib/_%s", dir, arch) + archLibDir := fmt.Sprintf("-I%s/../lib/_%s", dir, arch) filterDir := fmt.Sprintf("-I%s/../filter/_cprog/", dir) - libbpfDir := fmt.Sprintf("-I%s/../lib/common/libbpf/_src", dir) + conntrackDir := fmt.Sprintf("-I%s/../conntrack/_cprog/", dir) + libbpfSrcDir := fmt.Sprintf("-I%s/../lib/common/libbpf/_src", dir) + libbpfIncludeLinuxDir := fmt.Sprintf("-I%s/../lib/common/libbpf/_include/linux", dir) + libbpfIncludeUapiLinuxDir := fmt.Sprintf("-I%s/../lib/common/libbpf/_include/uapi/linux", dir) + libbpfIncludeAsmDir := fmt.Sprintf("-I%s/../lib/common/libbpf/_include/asm", dir) targetArch := "-D__TARGET_ARCH_x86" if arch == "arm64" { targetArch = "-D__TARGET_ARCH_arm64" } // Keep target as bpf, otherwise clang compilation yields bpf object that elf reader cannot load. - err = loader.CompileEbpf(ctx, "-target", "bpf", "-Wall", targetArch, "-g", "-O2", "-c", bpfSourceFile, "-o", bpfOutputFile, includeDir, libbpfDir, filterDir) + err = loader.CompileEbpf(ctx, "-target", "bpf", "-Wall", targetArch, "-g", "-O2", "-c", bpfSourceFile, "-o", bpfOutputFile, + archLibDir, + libbpfSrcDir, + libbpfIncludeAsmDir, + libbpfIncludeLinuxDir, + libbpfIncludeUapiLinuxDir, + filterDir, + conntrackDir, + ) if err != nil { return err } @@ -547,13 +560,14 @@ func (p *packetParser) processRecord(ctx context.Context, id int) { destinationPortShort := uint32(utils.HostToNetShort(bpfEvent.DstPort)) fl := utils.ToFlow( - ktime.MonotonicOffset.Nanoseconds()+int64(bpfEvent.Ts), + p.l, + ktime.MonotonicOffset.Nanoseconds()+int64(bpfEvent.T_nsec), utils.Int2ip(bpfEvent.SrcIp).To4(), // Precautionary To4() call. utils.Int2ip(bpfEvent.DstIp).To4(), // Precautionary To4() call. sourcePortShort, destinationPortShort, bpfEvent.Proto, - bpfEvent.Dir, + bpfEvent.ObservationPoint, flow.Verdict_FORWARDED, ) if fl == nil { @@ -561,6 +575,12 @@ func (p *packetParser) processRecord(ctx context.Context, id int) { continue } + // Add the isReply flag to the flow. + fl.IsReply = &wrapperspb.BoolValue{Value: bpfEvent.IsReply} + + // Add the traffic direction to the flow. + fl.TrafficDirection = flow.TrafficDirection(bpfEvent.TrafficDirection) + meta := &utils.RetinaMetadata{} // Add packet size to the flow's metadata. @@ -568,13 +588,21 @@ func (p *packetParser) processRecord(ctx context.Context, id int) { // Add the TCP metadata to the flow. tcpMetadata := bpfEvent.TcpMetadata - utils.AddTCPFlags(fl, tcpMetadata.Syn, tcpMetadata.Ack, tcpMetadata.Fin, tcpMetadata.Rst, tcpMetadata.Psh, tcpMetadata.Urg) + utils.AddTCPFlags( + fl, + uint16((bpfEvent.Flags&TCPFlagSYN)>>1), + uint16((bpfEvent.Flags&TCPFlagACK)>>4), // nolint:gomnd // 4 is the offset for ACK. + uint16((bpfEvent.Flags&TCPFlagFIN)>>0), + uint16((bpfEvent.Flags&TCPFlagRST)>>2), // nolint:gomnd // 2 is the offset for RST. + uint16((bpfEvent.Flags&TCPFlagPSH)>>3), // nolint:gomnd // 3 is the offset for PSH. + uint16((bpfEvent.Flags&TCPFlagURG)>>5), // nolint:gomnd // 5 is the offset for URG. + ) // For packets originating from node, we use tsval as the tcpID. // Packets coming back has the tsval echoed in tsecr. - if fl.TraceObservationPoint == flow.TraceObservationPoint_TO_NETWORK { + if fl.GetTraceObservationPoint() == flow.TraceObservationPoint_TO_NETWORK { utils.AddTCPID(meta, uint64(tcpMetadata.Tsval)) - } else if fl.TraceObservationPoint == flow.TraceObservationPoint_FROM_NETWORK { + } else if fl.GetTraceObservationPoint() == flow.TraceObservationPoint_FROM_NETWORK { utils.AddTCPID(meta, uint64(tcpMetadata.Tsecr)) } diff --git a/pkg/plugin/packetparser/packetparser_linux_test.go b/pkg/plugin/packetparser/packetparser_linux_test.go index 95251f9365..3fd55b8fc2 100644 --- a/pkg/plugin/packetparser/packetparser_linux_test.go +++ b/pkg/plugin/packetparser/packetparser_linux_test.go @@ -308,12 +308,12 @@ func TestReadDataPodLevelEnabled(t *testing.T) { defer ctrl.Finish() bpfEvent := &packetparserPacket{ //nolint:typecheck - SrcIp: uint32(83886272), // 192.0.0.5 - DstIp: uint32(16777226), // 10.0.0.1 - Proto: uint8(6), // TCP - Dir: uint32(1), // TO Endpoint - SrcPort: uint16(80), - DstPort: uint16(443), + SrcIp: uint32(83886272), // 192.0.0.5 + DstIp: uint32(16777226), // 10.0.0.1 + Proto: uint8(6), // TCP + ObservationPoint: uint8(1), // TO Endpoint + SrcPort: uint16(80), + DstPort: uint16(443), } bytes, _ := json.Marshal(bpfEvent) record := perf.Record{ @@ -532,7 +532,7 @@ func TestPacketParseGenerate(t *testing.T) { t.Fatalf("dynamic header file does not exist: %v", err) } - expectedContents := "#define BYPASS_LOOKUP_IP_OF_INTEREST 1 \n" + expectedContents := "#define BYPASS_LOOKUP_IP_OF_INTEREST 1\n#define DATA_AGGREGATION_LEVEL 0\n" actualContents, err := os.ReadFile(dynamicHeaderPath) if err != nil { t.Fatalf("failed to read dynamic header file: %v", err) diff --git a/pkg/plugin/packetparser/types_linux.go b/pkg/plugin/packetparser/types_linux.go index c5fa2fcc97..27709f4ec7 100644 --- a/pkg/plugin/packetparser/types_linux.go +++ b/pkg/plugin/packetparser/types_linux.go @@ -19,6 +19,17 @@ import ( "github.com/microsoft/retina/pkg/plugin/api" ) +const ( + TCPFlagFIN = 1 << iota + TCPFlagSYN + TCPFlagRST + TCPFlagPSH + TCPFlagACK + TCPFlagURG + TCPFlagECE + TCPFlagCWR +) + const ( Name api.PluginName = "packetparser" toEndpoint string = "toEndpoint" diff --git a/pkg/plugin/tcpretrans/tcpretrans_linux.go b/pkg/plugin/tcpretrans/tcpretrans_linux.go index 90d494ec71..837d3f0db3 100644 --- a/pkg/plugin/tcpretrans/tcpretrans_linux.go +++ b/pkg/plugin/tcpretrans/tcpretrans_linux.go @@ -118,6 +118,7 @@ func (t *tcpretrans) eventHandler(event *types.Event) { // TODO add metric here or add a enriched value fl := utils.ToFlow( + t.l, int64(event.Timestamp), net.ParseIP(event.SrcEndpoint.L3Endpoint.Addr).To4(), // Precautionary To4() call. net.ParseIP(event.DstEndpoint.L3Endpoint.Addr).To4(), // Precautionary To4() call. diff --git a/pkg/utils/flow_utils.go b/pkg/utils/flow_utils.go index 2bc41333c6..cffbeed2f9 100644 --- a/pkg/utils/flow_utils.go +++ b/pkg/utils/flow_utils.go @@ -31,22 +31,15 @@ const ( // 2 is from host to network and 3 is from network to host. // ts is the timestamp in nanoseconds. func ToFlow( + l *log.ZapLogger, ts int64, sourceIP, destIP net.IP, sourcePort, destPort uint32, proto uint8, - observationPoint uint32, + observationPoint uint8, verdict flow.Verdict, ) *flow.Flow { //nolint:typecheck - var ( - l4 *flow.Layer4 - checkpoint flow.TraceObservationPoint - direction flow.TrafficDirection - subeventtype int - ) - - l := log.Logger().Named("ToFlow") - + var l4 *flow.Layer4 switch proto { case 6: l4 = &flow.Layer4{ @@ -68,23 +61,28 @@ func ToFlow( } } + var ( + checkpoint flow.TraceObservationPoint + subeventtype int + direction flow.TrafficDirection + ) // We are attaching the filters to the veth interface on the host side. // So for HOST -> CONTAINER, egress of host veth is ingress of container. // Hence, we need to swap the direction. switch observationPoint { - case uint32(0): + case uint8(0): //nolint:gomnd // flow.TraceObservationPoint_TO_STACK checkpoint = flow.TraceObservationPoint_TO_STACK direction = flow.TrafficDirection_EGRESS subeventtype = int(api.TraceToStack) - case uint32(1): + case uint8(1): //nolint:gomnd // flow.TraceObservationPoint_TO_ENDPOINT checkpoint = flow.TraceObservationPoint_TO_ENDPOINT direction = flow.TrafficDirection_INGRESS subeventtype = int(api.TraceToLxc) - case uint32(2): + case uint8(2): //nolint:gomnd // flow.TraceObservationPoint_FROM_NETWORK checkpoint = flow.TraceObservationPoint_FROM_NETWORK direction = flow.TrafficDirection_INGRESS subeventtype = int(api.TraceFromNetwork) - case uint32(3): + case uint8(3): //nolint:gomnd // flow.TraceObservationPoint_TO_NETWORK checkpoint = flow.TraceObservationPoint_TO_NETWORK direction = flow.TrafficDirection_EGRESS subeventtype = int(api.TraceToNetwork) @@ -113,10 +111,13 @@ func ToFlow( }, L4: l4, TraceObservationPoint: checkpoint, - TrafficDirection: direction, - Verdict: verdict, - Extensions: ext, - IsReply: &wrapperspb.BoolValue{Value: false}, // Setting false by default as we don't have a better way to determine flow direction. + // Packetparser running with conntrack can determine the traffic direction correctly and will override this value. + TrafficDirection: direction, + Verdict: verdict, + Extensions: ext, + // Setting IsReply to false by default. + // Packetparser running with conntrack can determine the direction of the flow, and will override this value. + IsReply: &wrapperspb.BoolValue{Value: false}, } if t, err := decodeTime(ts); err == nil { f.Time = t diff --git a/pkg/utils/metadata_linux.pb.go b/pkg/utils/metadata_linux.pb.go index c7be0e4a4f..28b2717b17 100644 --- a/pkg/utils/metadata_linux.pb.go +++ b/pkg/utils/metadata_linux.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v3.21.12 +// protoc-gen-go v1.34.2 +// protoc v3.19.1 // source: metadata_linux.proto package utils @@ -263,7 +263,7 @@ func file_metadata_linux_proto_rawDescGZIP() []byte { var file_metadata_linux_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_metadata_linux_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_metadata_linux_proto_goTypes = []interface{}{ +var file_metadata_linux_proto_goTypes = []any{ (DNSType)(0), // 0: utils.DNSType (DropReason)(0), // 1: utils.DropReason (*RetinaMetadata)(nil), // 2: utils.RetinaMetadata @@ -284,7 +284,7 @@ func file_metadata_linux_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_metadata_linux_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_metadata_linux_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*RetinaMetadata); i { case 0: return &v.state diff --git a/pkg/utils/metadata_windows.pb.go b/pkg/utils/metadata_windows.pb.go index dba2e73d75..1347fb144a 100644 --- a/pkg/utils/metadata_windows.pb.go +++ b/pkg/utils/metadata_windows.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.32.0 -// protoc v3.21.12 +// protoc-gen-go v1.34.2 +// protoc v3.19.1 // source: metadata_windows.proto package utils @@ -111,9 +111,7 @@ const ( DropReason_Drop_FilteredIsolationUntagged DropReason = 36 DropReason_Drop_InvalidPDQueue DropReason = 37 DropReason_Drop_LowPower DropReason = 38 - // // General errors - // DropReason_Drop_Pause DropReason = 201 DropReason_Drop_Reset DropReason = 202 DropReason_Drop_SendAborted DropReason = 203 @@ -141,16 +139,12 @@ const ( DropReason_Drop_UnallowedEtherType DropReason = 226 DropReason_Drop_VportDown DropReason = 227 DropReason_Drop_SteeringMismatch DropReason = 228 - // // NetVsc errors - // DropReason_Drop_MicroportError DropReason = 401 DropReason_Drop_VfNotReady DropReason = 402 DropReason_Drop_MicroportNotReady DropReason = 403 DropReason_Drop_VMBusError DropReason = 404 - // // Tcpip FL errors - // DropReason_Drop_FL_LoopbackPacket DropReason = 601 DropReason_Drop_FL_InvalidSnapHeader DropReason = 602 DropReason_Drop_FL_InvalidEthernetType DropReason = 603 @@ -166,9 +160,7 @@ const ( DropReason_Drop_FL_NoClientInterface DropReason = 613 DropReason_Drop_FL_TooManyNetBuffers DropReason = 614 DropReason_Drop_FL_FlsNpiClientDrop DropReason = 615 - // // VFP errors - // DropReason_Drop_ArpGuard DropReason = 701 DropReason_Drop_ArpLimiter DropReason = 702 DropReason_Drop_DhcpLimiter DropReason = 703 @@ -188,9 +180,7 @@ const ( DropReason_Drop_NDPGuard DropReason = 717 DropReason_Drop_PortBlocked DropReason = 718 DropReason_Drop_NicSuspended DropReason = 719 - // // Tcpip NL errors - // DropReason_Drop_NL_BadSourceAddress DropReason = 901 DropReason_Drop_NL_NotLocallyDestined DropReason = 902 DropReason_Drop_NL_ProtocolUnreachable DropReason = 903 @@ -293,9 +283,7 @@ const ( DropReason_Drop_NL_SourceViolation DropReason = 1000 DropReason_Drop_NL_IcmpJumbogram DropReason = 1001 DropReason_Drop_NL_SwUsoFailure DropReason = 1002 - // // INET discard reasons - // DropReason_Drop_INET_SourceUnspecified DropReason = 1200 DropReason_Drop_INET_DestinationMulticast DropReason = 1201 DropReason_Drop_INET_HeaderInvalid DropReason = 1202 @@ -329,9 +317,7 @@ const ( DropReason_Drop_INET_SynAttack DropReason = 1230 DropReason_Drop_INET_AcceptInspection DropReason = 1231 DropReason_Drop_INET_AcceptRedirection DropReason = 1232 - // // Slbmux Error - // DropReason_Drop_SlbMux_ParsingFailure DropReason = 1301 DropReason_Drop_SlbMux_FirstFragmentMiss DropReason = 1302 DropReason_Drop_SlbMux_ICMPErrorPayloadValidationFailure DropReason = 1303 @@ -359,9 +345,7 @@ const ( DropReason_Drop_SlbMux_InvalidDiagPacketEncapType DropReason = 1325 DropReason_Drop_SlbMux_DiagPacketIsRedirect DropReason = 1326 DropReason_Drop_SlbMux_UnableToHandleRedirect DropReason = 1327 - // // Ipsec Errors - // DropReason_Drop_Ipsec_BadSpi DropReason = 1401 DropReason_Drop_Ipsec_SALifetimeExpired DropReason = 1402 DropReason_Drop_Ipsec_WrongSA DropReason = 1403 @@ -380,9 +364,7 @@ const ( DropReason_Drop_Ipsec_Dosp_MaxPerIpRateLimitQueues DropReason = 1416 DropReason_Drop_Ipsec_NoMemory DropReason = 1417 DropReason_Drop_Ipsec_Unsuccessful DropReason = 1418 - // // NetCx Drop Reasons - // DropReason_Drop_NetCx_NetPacketLayoutParseFailure DropReason = 1501 DropReason_Drop_NetCx_SoftwareChecksumFailure DropReason = 1502 DropReason_Drop_NetCx_NicQueueStop DropReason = 1503 @@ -390,14 +372,10 @@ const ( DropReason_Drop_NetCx_LSOFailure DropReason = 1505 DropReason_Drop_NetCx_USOFailure DropReason = 1506 DropReason_Drop_NetCx_BufferBounceFailureAndPacketIgnore DropReason = 1507 - // // Http errors 3000 - 4000. // These must be in sync with cmd\resource.h - // DropReason_Drop_Http_Begin DropReason = 3000 - // // UlErrors - // DropReason_Drop_Http_UlError_Begin DropReason = 3001 DropReason_Drop_Http_UlError DropReason = 3002 DropReason_Drop_Http_UlErrorVerb DropReason = 3003 @@ -492,13 +470,9 @@ const ( DropReason_Drop_Http_UxDuoFaultContentLengthDisallowed DropReason = 3461 DropReason_Drop_Http_UxDuoFaultTrailerDisallowed DropReason = 3462 DropReason_Drop_Http_UxDuoFaultEnd DropReason = 3463 - // - // WSK layer drops - // + // WSK layer drops DropReason_Drop_Http_ReceiveSuppressed DropReason = 3600 - // - // Http/SSL layer drops - // + // Http/SSL layer drops DropReason_Drop_Http_Generic DropReason = 3800 DropReason_Drop_Http_InvalidParameter DropReason = 3801 DropReason_Drop_Http_InsufficientResources DropReason = 3802 @@ -2326,7 +2300,7 @@ func file_metadata_windows_proto_rawDescGZIP() []byte { var file_metadata_windows_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_metadata_windows_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_metadata_windows_proto_goTypes = []interface{}{ +var file_metadata_windows_proto_goTypes = []any{ (DNSType)(0), // 0: utils.DNSType (DropReason)(0), // 1: utils.DropReason (*RetinaMetadata)(nil), // 2: utils.RetinaMetadata @@ -2347,7 +2321,7 @@ func file_metadata_windows_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_metadata_windows_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_metadata_windows_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*RetinaMetadata); i { case 0: return &v.state diff --git a/pkg/utils/utils_linux_test.go b/pkg/utils/utils_linux_test.go index b3a94ce313..b672033496 100644 --- a/pkg/utils/utils_linux_test.go +++ b/pkg/utils/utils_linux_test.go @@ -17,12 +17,12 @@ import ( ) func TestToFlow(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) + l, _ := log.SetupZapLogger(log.GetDefaultLogOpts()) ts := int64(1649748687588860) - f := ToFlow(ts, net.ParseIP("1.1.1.1").To4(), + f := ToFlow(l, ts, net.ParseIP("1.1.1.1").To4(), net.ParseIP("2.2.2.2").To4(), - 443, 80, 6, uint32(1), flow.Verdict_FORWARDED) + 443, 80, 6, uint8(1), flow.Verdict_FORWARDED) /* expected ---> flow.Flow{ IP: &flow.IP{ @@ -64,26 +64,27 @@ func TestToFlow(t *testing.T) { } expectedSubtype := []int32{3, 0, 10, 11, 0} for idx, val := range []uint32{0, 1, 2, 3, 4} { - f = ToFlow(ts, net.ParseIP("1.1.1.1").To4(), + f = ToFlow(l, ts, net.ParseIP("1.1.1.1").To4(), net.ParseIP("2.2.2.2").To4(), - 443, 80, 6, uint32(val), flow.Verdict_FORWARDED) + 443, 80, 6, uint8(val), flow.Verdict_FORWARDED) assert.EqualValues(t, f.TraceObservationPoint, expectedObsPoint[idx]) assert.EqualValues(t, f.GetEventType().GetSubType(), expectedSubtype[idx]) } } func TestAddPacketSize(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) + l, _ := log.SetupZapLogger(log.GetDefaultLogOpts()) ts := int64(1649748687588864) fl := ToFlow( + l, ts, net.ParseIP("1.1.1.1").To4(), net.ParseIP("2.2.2.2").To4(), 443, 80, 6, - uint32(1), + uint8(1), flow.Verdict_FORWARDED, ) meta := &RetinaMetadata{} @@ -95,17 +96,18 @@ func TestAddPacketSize(t *testing.T) { } func TestTcpID(t *testing.T) { - log.SetupZapLogger(log.GetDefaultLogOpts()) + l, _ := log.SetupZapLogger(log.GetDefaultLogOpts()) ts := int64(1649748687588864) fl := ToFlow( + l, ts, net.ParseIP("1.1.1.1").To4(), net.ParseIP("2.2.2.2").To4(), 443, 80, 6, - uint32(1), + uint8(1), flow.Verdict_FORWARDED, ) @@ -118,7 +120,7 @@ func TestTcpID(t *testing.T) { func TestAddDropReason(t *testing.T) { testCases := []struct { name string - dropReason uint32 + dropReason uint16 expectedDesc flow.DropReason expectedReason uint32 expectedRetinaReason string