Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: LRU cache for flows in packetparser #590

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions pkg/plugin/packetparser/packetparser_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"path"
"runtime"
"sync"
"time"

"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/cilium/cilium/api/v1/flow"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
Expand Down Expand Up @@ -181,6 +184,9 @@ func (p *packetParser) Init() error {
p.tcMap = &sync.Map{}
p.interfaceLockMap = &sync.Map{}

// Initialize flow cache.
p.flowCache = expirable.NewLRU[flowCacheKey, *flow.Flow](flowCacheSize, nil, flowCacheTTL)

return nil
}

Expand Down Expand Up @@ -534,13 +540,6 @@ func (p *packetParser) processRecord(ctx context.Context, id int) {
p.l.Info("Context is done, stopping Worker", zap.Int("worker_id", id))
return
case record := <-p.recordsChannel:
p.l.Debug("Received record",
zap.Int("cpu", record.CPU),
zap.Uint64("lost_samples", record.LostSamples),
zap.Int("bytes_remaining", record.Remaining),
zap.Int("worker_id", id),
)

var bpfEvent packetparserPacket
err := binary.Read(bytes.NewReader(record.RawSample), binary.LittleEndian, &bpfEvent)
if err != nil {
Expand All @@ -553,19 +552,42 @@ func (p *packetParser) processRecord(ctx context.Context, id int) {
sourcePortShort := uint32(utils.HostToNetShort(bpfEvent.SrcPort))
destinationPortShort := uint32(utils.HostToNetShort(bpfEvent.DstPort))

fl := utils.ToFlow(
ktime.MonotonicOffset.Nanoseconds()+int64(bpfEvent.Ts),
utils.Int2ip(bpfEvent.SrcIp).To4(), // Precautionary To4() call.
utils.Int2ip(bpfEvent.DstIp).To4(), // Precautionary To4() call.
sourcePortShort,
destinationPortShort,
bpfEvent.Proto,
bpfEvent.Dir,
flow.Verdict_FORWARDED,
)
if fl == nil {
p.l.Warn("Could not convert bpfEvent to flow", zap.Any("bpfEvent", bpfEvent))
continue
// Check if the flow exists in the cache.
key := flowCacheKey{
srcIP: bpfEvent.SrcIp,
dstIP: bpfEvent.DstIp,
srcPort: sourcePortShort,
dstPort: destinationPortShort,
proto: bpfEvent.Proto,
dir: bpfEvent.Dir,
}

var fl *flow.Flow
fl, ok := p.flowCache.Get(key)
if !ok {
fl = utils.ToFlow(
ktime.MonotonicOffset.Nanoseconds()+int64(bpfEvent.Ts),
utils.Int2ip(bpfEvent.SrcIp).To4(), // Precautionary To4() call.
utils.Int2ip(bpfEvent.DstIp).To4(), // Precautionary To4() call.
sourcePortShort,
destinationPortShort,
bpfEvent.Proto,
bpfEvent.Dir,
flow.Verdict_FORWARDED,
)
if fl == nil {
p.l.Warn("Could not convert bpfEvent to flow", zap.Any("bpfEvent", bpfEvent))
continue
}
// Add the flow to the cache.
p.flowCache.Add(key, fl)
} else {
// Update the flow's time.
if t, err := decodeTime(ktime.MonotonicOffset.Nanoseconds() + int64(bpfEvent.Ts)); err == nil {
fl.Time = t
} else {
p.l.Warn("Failed to get current time", zap.Error(err))
}
}

meta := &utils.RetinaMetadata{}
Expand All @@ -579,9 +601,9 @@ func (p *packetParser) processRecord(ctx context.Context, id int) {

// For packets originating from node, we use tsval as the tcpID.
// Packets coming back has the tsval echoed in tsecr.
if fl.TraceObservationPoint == flow.TraceObservationPoint_TO_NETWORK {
if fl.GetTraceObservationPoint() == flow.TraceObservationPoint_TO_NETWORK {
utils.AddTCPID(meta, uint64(tcpMetadata.Tsval))
} else if fl.TraceObservationPoint == flow.TraceObservationPoint_FROM_NETWORK {
} else if fl.GetTraceObservationPoint() == flow.TraceObservationPoint_FROM_NETWORK {
utils.AddTCPID(meta, uint64(tcpMetadata.Tsecr))
}

Expand Down Expand Up @@ -664,3 +686,16 @@ func absPath() (string, error) {
dir := path.Dir(filename)
return dir, nil
}

// decodeTime converts nanoseconds to a protobuf timestamp.
func decodeTime(nanoseconds int64) (pbTime *timestamppb.Timestamp, err error) {
goTime, err := time.Parse(time.RFC3339Nano, time.Unix(0, nanoseconds).Format(time.RFC3339Nano))
if err != nil {
return nil, errors.Wrap(err, "failed to parse time")
}
pbTime = timestamppb.New(goTime)
if err = pbTime.CheckValid(); err != nil {
return nil, errors.Wrap(err, "invalid timestamp")
}
return pbTime, nil
}
18 changes: 16 additions & 2 deletions pkg/plugin/packetparser/types_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ package packetparser

import (
"sync"
"time"

kcfg "github.com/microsoft/retina/pkg/config"

"github.com/cilium/cilium/api/v1/flow"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
"github.com/florianl/go-tc"
"github.com/vishvananda/netlink"

"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
"github.com/microsoft/retina/pkg/plugin/api"
"github.com/vishvananda/netlink"
)

const (
Expand All @@ -27,6 +29,8 @@ const (
Device string = "device"
workers int = 2
buffer int = 10000
flowCacheSize int = 1000
flowCacheTTL time.Duration = 6 * time.Minute
bpfSourceDir string = "_cprog"
bpfSourceFileName string = "packetparser.c"
bpfObjectFileName string = "packetparser_bpf.o"
Expand Down Expand Up @@ -57,6 +61,15 @@ type key struct {
netNs int
}

type flowCacheKey struct {
srcIP uint32
dstIP uint32
srcPort uint32
dstPort uint32
proto uint8
dir uint32
}

//go:generate go run go.uber.org/mock/[email protected] -source=types_linux.go -destination=mocks/mock_types.go -package=mocks

// Define the interfaces.
Expand Down Expand Up @@ -90,6 +103,7 @@ type packetParser struct {
cfg *kcfg.Config
l *log.ZapLogger
callbackID string
flowCache *expirable.LRU[flowCacheKey, *flow.Flow]
objs *packetparserObjects //nolint:typecheck
// tcMap is a map of key to *val.
tcMap *sync.Map
Expand Down
Loading