diff --git a/.pipelines/cg-pipeline.yaml b/.pipelines/cg-pipeline.yaml index 9c5a000bdf..70c57f9be6 100644 --- a/.pipelines/cg-pipeline.yaml +++ b/.pipelines/cg-pipeline.yaml @@ -135,39 +135,6 @@ stages: pathtoPublish: "$(Build.ArtifactStagingDirectory)" condition: succeeded() - # windows 2019 only in buildx, windows server 2022 requires native windows container build because of cgo - - job: retinaagentimageswin2019 - displayName: Build Retina Windows Images (buildx) - pool: - name: "$(BUILD_POOL_NAME_DEFAULT)" - strategy: - matrix: - windows-ltsc2019: - platform: "windows" - arch: "amd64" - year: "2019" - - steps: - - checkout: self - fetchTags: true - - script: | - set -euo pipefail - echo "VERSION=$(make version)" - export VERSION=$(make version) - mkdir -p ./output/images/$(platform)/$(arch)/$(year) - make retina-image-win \ - TARGET=final \ - WINDOWS_YEARS=$(year) \ - TAG=$(make version) \ - BUILDX_ACTION="-o type=docker,dest=./output/images/$(platform)/$(arch)/$(year)/retina-agent-$VERSION-windows-ltsc$(year)-$(arch).tar" - displayName: "Build Retina Windows Image" - - - task: PublishBuildArtifacts@1 - inputs: - artifactName: output - pathtoPublish: ./output - condition: succeeded() - - job: windowsnative displayName: Build Retina Windows Images (native) pool: @@ -176,12 +143,29 @@ stages: - checkout: self fetchTags: true - - task: Docker@2 - displayName: Docker Login + + + - task: DownloadPipelineArtifact@2 inputs: - containerRegistry: $(WINDOWS_BUILDER_REGISTRY) - command: "login" - addPipelineData: false + buildType: 'specific' + project: $(BUILDER_ADO_PROECT) + definition: $(BUILDER_ADO_DEFINITION_ID) # Replace with your build definition ID + buildId: $(BUILDER_ADO_BUILD_ID) + artifactName: $(BUILDER_ADO_ARTIFACTE_NAME) # Replace with your artifact name + itemPattern: '**/*builder*.tar' + downloadPath: '$(Pipeline.Workspace)\artifacts' + + - task: PowerShell@2 + displayName: "Load Builder Image" + inputs: + targetType: "inline" + script: | + $rootDir = "$(Pipeline.Workspace)\artifacts" + $dockerImages = Get-ChildItem -Path $rootDir -Recurse -Filter *.tar + foreach ($image in $dockerImages) { + Write-Host "Loading Docker image: $($image.FullName)" + docker load -i $image.FullName + } - task: PowerShell@2 displayName: "Build Retina Windows Image (LTSC2022)" diff --git a/controller/Dockerfile.windows-native b/controller/Dockerfile.windows-native index c04687b53b..d1afbda0a2 100644 --- a/controller/Dockerfile.windows-native +++ b/controller/Dockerfile.windows-native @@ -7,9 +7,8 @@ FROM --platform=windows/amd64 mcr.microsoft.com/oss/go/microsoft/golang:1.22-wi WORKDIR C:\\retina COPY go.mod . COPY go.sum . -ENV CGO_ENABLED=1 +ENV CGO_ENABLED=0 RUN go mod download -RUN go mod verify ADD . . ARG VERSION ARG APP_INSIGHTS_ID diff --git a/controller/Dockerfile.windows-native.dockerignore b/controller/Dockerfile.windows-native.dockerignore index 8a72f6a456..4d5f04e6f9 100644 --- a/controller/Dockerfile.windows-native.dockerignore +++ b/controller/Dockerfile.windows-native.dockerignore @@ -1,2 +1,3 @@ pkg/plugin/windows/pktmon/packetmonitorsupport/* *.tar + \ No newline at end of file diff --git a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go index 053f492271..ec6940aaf7 100644 --- a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go +++ b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go @@ -8,7 +8,6 @@ import ( "os" "os/exec" - "github.com/cilium/cilium/api/v1/flow" observerv1 "github.com/cilium/cilium/api/v1/observer" v1 "github.com/cilium/cilium/pkg/hubble/api/v1" kcfg "github.com/microsoft/retina/pkg/config" @@ -21,18 +20,21 @@ import ( "go.uber.org/zap/zapio" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) var ( ErrNilEnricher = errors.New("enricher is nil") ErrUnexpectedExit = errors.New("unexpected exit") + ErrNilGrpcClient = errors.New("grpc client is nil") socket = "/temp/retina-pktmon.sock" ) const ( Name = "pktmon" - connectionRetryAttempts = 3 + connectionRetryAttempts = 5 + eventChannelSize = 1000 ) type Plugin struct { @@ -42,6 +44,9 @@ type Plugin struct { pktmonCmd *exec.Cmd stdWriter *zapio.Writer errWriter *zapio.Writer + + grpcClient *GRPCClient + stream observerv1.Observer_GetFlowsClient } func (p *Plugin) Init() error { @@ -52,11 +57,11 @@ func (p *Plugin) Name() string { return "pktmon" } -type Client struct { +type GRPCClient struct { observerv1.ObserverClient } -func NewClient() (*Client, error) { +func newGRPCClient() (*GRPCClient, error) { retryPolicy := map[string]any{ "methodConfig": []map[string]any{ { @@ -84,15 +89,15 @@ func NewClient() (*Client, error) { return nil, fmt.Errorf("failed to dial pktmon server: %w", err) } - return &Client{observerv1.NewObserverClient(conn)}, nil + return &GRPCClient{observerv1.NewObserverClient(conn)}, nil } -func (p *Plugin) RunPktMonServer() error { +func (p *Plugin) RunPktMonServer(ctx context.Context) error { p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} defer p.stdWriter.Close() p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} defer p.errWriter.Close() - p.pktmonCmd = exec.Command("controller-pktmon.exe") + p.pktmonCmd = exec.CommandContext(ctx, "controller-pktmon.exe") p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket) p.pktmonCmd.Env = os.Environ() p.pktmonCmd.Stdout = p.stdWriter @@ -111,103 +116,122 @@ func (p *Plugin) RunPktMonServer() error { } func (p *Plugin) Start(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - p.enricher = enricher.Instance() if p.enricher == nil { return ErrNilEnricher } go func() { - err := p.RunPktMonServer() + err := p.RunPktMonServer(ctx) if err != nil { - p.l.Error("failed to run pktmon server", zap.Error(err)) + p.l.Error("pktmon server exited", zap.Error(err)) } - - // if the pktmon server process exits, cancel the context, we need to crash - cancel() }() - var str observerv1.Observer_GetFlowsClient + err := p.SetupStream() + if err != nil { + return fmt.Errorf("failed to setup initial pktmon stream: %w", err) + } + + // run the getflows loop + for { + err := p.GetFlow(ctx) + if _, ok := status.FromError(err); ok { + p.l.Error("failed to get flow, retriable:", zap.Error(err)) + continue + } + return fmt.Errorf("failed to get flow, unrecoverable: %w", err) + } +} + +func (p *Plugin) SetupStream() error { + var err error fn := func() error { p.l.Info("creating pktmon client") - client, err := NewClient() + p.grpcClient, err = newGRPCClient() if err != nil { return fmt.Errorf("failed to create pktmon client before getting flows: %w", err) } - str, err = client.GetFlows(ctx, &observerv1.GetFlowsRequest{}) + return nil + } + err = utils.Retry(fn, connectionRetryAttempts) + if err != nil { + return fmt.Errorf("failed to create pktmon client: %w", err) + } + + return nil +} + +func (p *Plugin) StartStream(ctx context.Context) error { + if p.grpcClient == nil { + return fmt.Errorf("unable to start stream: %w", ErrNilGrpcClient) + } + + var err error + fn := func() error { + p.stream, err = p.grpcClient.GetFlows(ctx, &observerv1.GetFlowsRequest{}) if err != nil { return fmt.Errorf("failed to open pktmon stream: %w", err) } return nil } - err := utils.Retry(fn, connectionRetryAttempts) + err = utils.Retry(fn, connectionRetryAttempts) if err != nil { return fmt.Errorf("failed to create pktmon client: %w", err) } + return nil +} + +func (p *Plugin) GetFlow(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + err := p.StartStream(ctx) + if err != nil { + return fmt.Errorf("failed to setup pktmon stream: %w", err) + } + for { select { case <-ctx.Done(): - return fmt.Errorf("pktmon context cancelled: %w", ctx.Err()) + return fmt.Errorf("pktmon plugin context done: %w", ctx.Err()) default: - err := p.GetFlow(str) + event, err := p.stream.Recv() if err != nil { - return fmt.Errorf("failed to get flow from observer: %w", err) + return fmt.Errorf("failed to receive pktmon event: %w", err) } - } - } -} - -func (p *Plugin) GetFlow(str observerv1.Observer_GetFlowsClient) error { - event, err := str.Recv() - if err != nil { - return fmt.Errorf("failed to receive pktmon event: %w", err) - } - fl := event.GetFlow() - if fl == nil { - p.l.Error("received nil flow, flow proto mismatch from client/server?") - return nil - } + fl := event.GetFlow() + if fl == nil { + p.l.Error("received nil flow, flow proto mismatch from client/server?") + return nil + } - ev := &v1.Event{ - Event: fl, - Timestamp: fl.GetTime(), - } + ev := &v1.Event{ + Event: fl, + Timestamp: fl.GetTime(), + } - if fl.GetType() == flow.FlowType_L7 { - dns := fl.GetL7().GetDns() - if dns != nil { - query := dns.GetQuery() - ans := dns.GetIps() - if dns.GetQtypes()[0] == "Q" { - p.l.Sugar().Debugf("query from %s to %s: request %s\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), query) + if p.enricher != nil { + p.enricher.Write(ev) } else { - p.l.Sugar().Debugf("answer from %s to %s: result: %+v\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), ans) + p.l.Error("enricher is nil when writing event") } - } - } - if p.enricher != nil { - p.enricher.Write(ev) - } else { - p.l.Error("enricher is nil when writing event") - } - - // Write the event to the external channel. - if p.externalChannel != nil { - select { - case p.externalChannel <- ev: - default: - // Channel is full, drop the event. - // We shouldn't slow down the reader. - metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc() + // Write the event to the external channel. + if p.externalChannel != nil { + select { + case p.externalChannel <- ev: + default: + // Channel is full, drop the event. + // We shouldn't slow down the reader. + metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc() + } + } } } - return nil } func (p *Plugin) SetupChannel(ch chan *v1.Event) error {